Kafka-node: HighLevelProducer does not work embedding a timestamp.

Created on 8 Sep 2017  Â·  20Comments  Â·  Source: SOHU-Co/kafka-node

Bug Report

HighLevelProducer does not work embedding a timestamp.

Environment

  • Node version: v6.1.0
  • Kafka-node version: v2.2.2
  • Kafka version: 0.10.0.2

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 10

Include Sample Code to produce behavior

producer.send([
  {
    topic: topic,
    messages: messages,
    timestamp: Date.now()
  }
], cb))

Include Sample Code to consumer behavior

Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "borker1:9092");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();

    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("newSignals");

    KStream<Windowed<String>, Long> wordCounts = textLines
        .mapValues(textLine -> {
          System.out.println(textLine);
          return textLine.toLowerCase();
        })
        .groupBy((key, word) -> {
          System.out.println(word.split("\\W+")[2]);
          return word.split("\\W+")[2];
        })
        .count(TimeWindows.of(60000), "Counts")
        .toStream();
    wordCounts.to(windowedSerde, Serdes.Long(), "test");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

Include output with Debug turned on


Include output Java concumer log

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = newSignals, partition = 0, offset = 0, CreateTime = -1, checksum = 2486091252, serialized key size = -1, serialized value size = 126, key = null, value = {"device_id":"12345","rssi":-21,"ts":1504859056569692,"square":"1234","shop":"1234"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
    at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)
    at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:60)
    at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:86)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

Thanks for your contribution!

bug producer

Most helpful comment

Setting the message.timestamp.type on topic level worked in my scenario.

By default message.timestamp.type=CreateTime, this means that a timestamp must be added by the producer. However if a timestamp is not added then the exception occurs in Streams Application:

As we are using a third party producer so we need to set message.timestamp.type=LogAppendTime.
By setting it to LogAppendTime, a timestamp on each message is added when it is received by the broker.

When a leader broker receives a message

  1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
  • If the message is a compressed message. the timestamp in the wrapper message will be updated to current server time. Broker will set the timestamp type bit in wrapper messages to 1. Broker will ignore the inner message timestamp. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
  • If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
  1. If message.timestamp.type=CreateTime
  • If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
  • If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.

message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.

All 20 comments

I've seen the same problem with both the HighLevelProducer and Producer using node 6.10.0 and Kafka 0.10.1.1

I'm able to inspect timestamps on kafka using this command line (obviously changing the name of the topic):

./kafka-simple-consumer-shell.sh --broker-list localhost:9092 --partition 0 --property print.timestamp=true --offset -2 --topic CommandAndControl

But the timestamps are always -1 even if I explicitly set the timestamp field to a specific value.

I am also getting same issue now. Java streams expecting timestamp, but this node module is putting timestamp as -1. Please let me anybody has solution for this

Looks like this is related to issue #812 and not with the producer. I was able to verify the timestamps set by kafka-node through the console consumer but the broker's response message omits the magic byte 1 and the timestamp data... 🤔

21:00 $ ./kafka-console-consumer.sh --from-beginning --bootstrap-server 127.0.0.1:9092 --topic af2bc514-9ce0-4817-b3b0-d7108a453dd6 --property print.timestamp=true
CreateTime:1518573433281    this is my message

Any fix for this issue.

The timestamps should be there as shown in the kafka-console-consumer output. Unfortunately, I do not know how to go about accessing it from the java client.

@hyperlink I think this needs to be re-opened. Our sink connector is also complaining that the records produced by this HighLevelProducer do not a have a timestamp.

It is indeed strange that the timestamps are there according to console-consumer.

Is there any other message metadata that could be causing this issue? Is there a version number mismatch or something?

@t-d-d we should be using producer request version 2 with magic byte of 1 with timestamps.

@hyperlink I'm still seeing this issue in kafka-node version 2.6.1. I've set the apiVersion to 2 for producer. Can you elaborate more about how to set this? As it's not documented.

The apiVersion and magic byte are set automatically based on the response of API request and what we can support. Unfortunately, I don't understand why the Java client isn't accepting the timestamp without digging into the Java code. Some help/pointers to the right area would be appreciated.

@hyperlink Thanks for your reply. Interestingly, this issue does not alway happen in my case. But if it happens, it will cause my Java client not functioning as Kafka Streams throws out a fatal error. Here is the piece of Java code from Kafka Streams where is extracts timestamp from record https://github.com/axbaretto/kafka/blob/5726125eea16aa67c2a0375d8113ebc3b847d27b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java.

I think I have the same issue using KSQL client
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

any idea to fix this issue ?

My workaround for this is using a custom TimestampExtractor instead of the default TimestampExtractor (FailOnInvalidTimestamp) at the Kafka Streams client side. When the custom TimestampExtractor extracts message, it check if the timestamp is negative, if so, embed the wall-clock timestamp. This solution works in my case, however, it would be nice to get this issue fixed.

I’m not using Kafka Stream directly but I’m using KSQL.
It would be very nice to fixe this issue

Le 18 mai 2018 à 11:50, Xin Han notifications@github.com a écrit :

My workaround for this is using a custom TimestampExtractor instead of the default TimestampExtractor (FailOnInvalidTimestamp) at the Kafka Streams client side. When the custom TimestampExtractor extracts message, it check if the timestamp is negative, if so, embed the wall-clock timestamp. This solution works in my case, however, it would be nice to get this issue fixed.

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub https://github.com/SOHU-Co/kafka-node/issues/761#issuecomment-390156348, or mute the thread https://github.com/notifications/unsubscribe-auth/AAJYT6Egc3GRKMLDpbADeBUXFBJppXMHks5tzplZgaJpZM4PQ42I.

@HansonHH Do you have any code snippets for custom TimestampExtractor

@vamshireddy21 Here is a simple example.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class CustomTimestampExtractor implements TimestampExtractor{ 
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        final long timestamp = consumerRecord.timestamp();

        if ( timestamp < 0 ) {
            return System.currentTimeMillis();
        }

        return timestamp;
    }
}

Setting the message.timestamp.type on topic level worked in my scenario.

By default message.timestamp.type=CreateTime, this means that a timestamp must be added by the producer. However if a timestamp is not added then the exception occurs in Streams Application:

As we are using a third party producer so we need to set message.timestamp.type=LogAppendTime.
By setting it to LogAppendTime, a timestamp on each message is added when it is received by the broker.

When a leader broker receives a message

  1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
  • If the message is a compressed message. the timestamp in the wrapper message will be updated to current server time. Broker will set the timestamp type bit in wrapper messages to 1. Broker will ignore the inner message timestamp. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
  • If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
  1. If message.timestamp.type=CreateTime
  • If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
  • If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.

message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.

This needs to be re-opened, the behavior I'm seeing with newer versions of kafka is consistent with others above. Thanks for the lib though!

This hit us again today. Not sure why it only happens on some services. Can someone point me at the code (file) that actually adds the timestamps?

For me, i was getting this same issue even for kafka client version>2.0. Root cause for me was a very low timeout of 500ms for connecting to kafka client which would make producer request to use default settings for kafka version 0.9. Increasing this timeout fixed the issue for me.

const client = new KafkaNode.KafkaClient({
      kafkaHost: config.bootstrap_servers,
      versions: {
        requestTimeout: 10000,
      }
});

It would make lot of sense to specify this in read.MD as one of possible configurations

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kameshwari-suresh picture kameshwari-suresh  Â·  3Comments

AnnisaNurika picture AnnisaNurika  Â·  5Comments

twawszczak picture twawszczak  Â·  6Comments

chetandev picture chetandev  Â·  5Comments

sergeyjsg picture sergeyjsg  Â·  4Comments