How is it possible to re-read messages from a given offset?
Now I have to create new Group, and read from oldest filtering until I get to given offset, is there a better way?
In your consumergroup's Setup() callback, you can reset to an older offset.
func (e exampleConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}
The consumer group will now consume from offset.
I would rather advise to use kafka-consumer-groups.sh command line to define a new offset for the existing consumer group.
The cli is available with any kafka distribution.
I usually run it from a kafka docker image, because it is easier.
You need to shut down temporarily every consumers and then run something like
kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --reset-offsets --topic [TOPIC]:[PARTITION] --to-offset [OFFSET] --execute
No need to change group or even code this way and you have much more flexibility.
Hi @cheesedosa , I've tried your code but not success. As I see there is no return err, how could I debug this problem?
Most helpful comment
In your consumergroup's
Setup()callback, you can reset to an older offset.The consumer group will now consume from
offset.