Kafka-node: How to get Message Timestamps from Consumer or Offset?

Created on 20 Jul 2017  路  6Comments  路  Source: SOHU-Co/kafka-node

How to consume the message time stamps?

Hi Everyone! Great work here with kafka-node!

I have successfully been able to consume messages but cannot see the timestamp of the message? As of version 0.10.2.0, Kafka should have a timestamp on every record in the partition but I don't know how to query it.

Here is my simple code example:
~~~javascript
// configuration stuff
const TOPIC_NAME = 'test2';

var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client("localhost:2181"), // use your kafka ip
offset = new kafka.Offset(client);

// fetch the offset from the beginning
offset.fetch([{ topic: TOPIC_NAME, partition: 0, time: -1, maxNum: 1 }], function (err, data) {
consumer = new Consumer(
client,
[
{ topic: TOPIC_NAME, partition: 0, offset:data[TOPIC_NAME]['0'][0] }
],
{
groupId: 'pacs_consumer',
autoCommit: false,
fromOffset: 'latest' // makes me consume only the latest records in the offset
}
);

// consumer.setOffset(TOPIC_NAME, 0, 223179);

consumer.on('message', function (message) {
    if (message && message.topic == TOPIC_NAME) {
        // if(message.offset < 100) {
                console.log(message);
        // }
    }
});

consumer.on("error", function(err) {
    console.log(err);
});

consumer.on('offsetOutOfRange', function (err) {
    console.log(err);
});

});
~~~
And here is the output I get

ubuntu@ip-10-0-0-178:~/kafka2mysql$ node kafka_tester.js 
{ topic: 'test',
  value: '{"type":"gauge","value":0}',
  offset: 131,
  partition: 0,
  highWaterOffset: 133,
  key: 'local-1500469041571.driver.BlockManager.disk.diskSpaceUsed_MB' }
{ topic: 'test',
  value: '{"type":"gauge","value":366}',
  offset: 132,
  partition: 0,
  highWaterOffset: 133,
  key: 'local-1500469041571.driver.BlockManager.memory.maxMem_MB' }
{ topic: 'test',
  value: '{"type":"gauge","value":0}',
  offset: 133,
  partition: 0,
  highWaterOffset: 158,
  key: 'local-1500469041571.driver.BlockManager.memory.memUsed_MB' }
{ topic: 'test',
  value: '{"type":"gauge","value":366}',
  offset: 134,
  partition: 0,
  highWaterOffset: 158,
  key: 'local-1500469041571.driver.BlockManager.memory.remainingMem_MB' }
^C

Just for some context...

I am producing metric data from spark (with a scala producer) and am trying to read the metrics from kafka and eventually write them to a MongoDB and/or MySQL database.

Thanks again!

Most helpful comment

if the timestamp is there it would be an attribute called timestamp on the message object.

I was able to get a timestamp in my consumed messages only after modifying sendFetchRequest in the the baseClient.js source code to use v2 encode and v1 decode.

I changed the following:

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
  var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
  // TODO: state validator for HLC for ignoring stale fetch requests
  var decoder = protocol.decodeFetchResponse(this._createMessageHandler(consumer), maxTickMessages);

    ...
};

to this:

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
  var encoder = protocol.encodeFetchRequestV2(fetchMaxWaitMs, fetchMinBytes);
  // TODO: state validator for HLC for ignoring stale fetch requests
  var decoder = protocol.decodeFetchResponseV1(this._createMessageHandler(consumer), maxTickMessages);

    ...
};

It seems like the fetch v1 and v2 functions have existed since version 2.6.0 (https://github.com/SOHU-Co/kafka-node/pull/871), but they aren't invoked anywhere in the source code. Are there any plans to make this configurable?

All 6 comments

Thank you for your question. There's a PR out there to add timestamp support (#693) but before we can merge that in we need to add support for versioned API into kafka-node. Having this will help maintain compatibility with older versions of kafka.

Should be available in 2.2.0

any idea if it is fixed. It seems the issue is still there with a KSQL client

has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

Hi @hyperlink, I'm using the version 4.1.0, is there a way to obtain the kafka timestamp at consuming messages?
Thank you

if the timestamp is there it would be an attribute called timestamp on the message object.

if the timestamp is there it would be an attribute called timestamp on the message object.

I was able to get a timestamp in my consumed messages only after modifying sendFetchRequest in the the baseClient.js source code to use v2 encode and v1 decode.

I changed the following:

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
  var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
  // TODO: state validator for HLC for ignoring stale fetch requests
  var decoder = protocol.decodeFetchResponse(this._createMessageHandler(consumer), maxTickMessages);

    ...
};

to this:

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
  var encoder = protocol.encodeFetchRequestV2(fetchMaxWaitMs, fetchMinBytes);
  // TODO: state validator for HLC for ignoring stale fetch requests
  var decoder = protocol.decodeFetchResponseV1(this._createMessageHandler(consumer), maxTickMessages);

    ...
};

It seems like the fetch v1 and v2 functions have existed since version 2.6.0 (https://github.com/SOHU-Co/kafka-node/pull/871), but they aren't invoked anywhere in the source code. Are there any plans to make this configurable?

Was this page helpful?
0 / 5 - 0 ratings