Sarama Version: master
Kafka Version: 0.10.0.1
Go Version: 1.7
Default for Sarama and Kafka
[sarama] 2016/09/27 21:09:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2016/09/27 21:09:53 Connected to broker at kafka.castle:9092 (registered as #1001)
[sarama] 2016/09/27 21:09:53 consumer/broker/1001 added subscription to raw_status/0
2016/09/27 21:09:53 Consumed message time:topic:partition:offset:message 0001-01-01 00:00:00 +0000 UTC:raw_status:0:0:[123 34 116 105 109 101 115 116 97 109 112 34 58 32 49 ...
Why is msg.Timestamp set to null on my consumed message ? Does I need to activate this somewhere ? In Sarama configuration or in Kafka configuration ?
I tried sending message via the http_server.go provided in the example directory as well as via a python producer where I manually set the timestamp in the send method.
Any clue ?
Thanks a lot,
Albin.
PS - Here is my consumer, directly copy pasted from your examples :
package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
defer func() {
if localErr := consumer.Close(); localErr != nil {
log.Fatalln(localErr)
}
}()
partitionConsumer, err := consumer.ConsumePartition("raw_status", 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message time:topic:partition:offset:message %s:%s:%d:%d:%v:%d\n", msg.Timestamp, msg.Topic, msg.Partition, msg.Offset, msg.Value, len(msg.Value))
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
Off the top of my head you need to create a config struct (NewConfig()) and set the Version value appropriately to tell Sarama it is safe to use the more recent API versions, then pass this configuration into the NewConsumer call.
Here it goes : 2016-09-30 10:41:17.909 +0200 CEST !
Thanks a lot @eapache, and thanks as well for the awesome work on this client. Have a nice weekend !
Hey @eapache ,
After my successful test with a Python producer, I tried to feed my consumer using Sarama async producer but the timestamp is not set.
I've retried my consumer with the Python producer to be 100% certain that the issue comes from the producer and I still have the Timestam set in the message received in the consumer.
Any idea why my timestamp is not set ? I've also trie to set the Timestamp manually via time.Now() but then Kafka emit errors like this :
[2016-10-04 13:38:29,113] ERROR [Replica Manager on Broker 1001]: Error processing append operation on partition topic-8 (kafka.server.ReplicaManager)
java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1
at kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:143)
at kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:111)
...
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Here is my complete producer code :
package scheduler
import (
"time"
"github.com/Shopify/sarama"
redis "gopkg.in/redis.v4"
"strings"
log "github.com/Sirupsen/logrus"
)
func Schedule(brokerList []string, redisHost string, topic string) {
// Configure sarama logger
sarama.Logger = log.New()
// Create new producer
producer, err := newAsyncProducer(brokerList)
if err != nil {
panic(err)
}
defer func() {
if err = producer.Close(); err != nil {
log.Fatal("Failed to close producer:", err)
}
}()
// Init Redis connection
client := redis.NewClient(&redis.Options{
Addr: redisHost,
Password: "", // no password set
DB: 0, // use default DB
})
// Get all the things
things, err := client.Keys("keys:*").Result()
if err != nil {
panic(err)
}
// Send them all
for _, thing := range things {
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(thing),
}
log.Infof("Send %s on %s", thing, topic)
}
}
// newAsyncProducer create a new asynchronous Kafka producer
func newAsyncProducer(brokerList []string) (sarama.AsyncProducer, error) {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
config.Version = sarama.V0_10_0_0
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
return nil, err
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Error("Failed to write entry:", err)
}
}()
return producer, nil
}
Thanks a lot for your support !
I've also tried to set the Timestamp manually via time.Now()
This is the correct method. We should probably add support for auto-setting the timestamp to time.Now but we leave it blank by default for now.
java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1
I _think_ this is a Sarama bug when using both timestamps and compression. Can you try with CompressionNone? If that works, please file a separate issue and I'll take a look at fixing it.
It works, time:topic:partition:offset:message 2016-10-04 20:30:39.092 +0200 CEST, opening an other issue !
Thanks,
Albin.
Most helpful comment
Off the top of my head you need to create a config struct (
NewConfig()) and set theVersionvalue appropriately to tell Sarama it is safe to use the more recent API versions, then pass this configuration into theNewConsumercall.