Confluent-kafka-dotnet: How Can I read from specific offset number only from KAFKA topic

Created on 4 Sep 2019  路  3Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

In a production environment if any failure or system issue happened How can I read specific failure offset or from eg 3000 to 3020 read it from topic.But my actual latest offset is 5000.I dont want to read all from 3000 to 5000

How to reproduce

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ ] Confluent.Kafka nuget version.
  • [ ] Apache Kafka version.
  • [ ] Client configuration.
  • [ ] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ ] Critical issue.
question

Most helpful comment

also worth noting you can override the start offset explicitly in the handler set with SetPartitionsAssigned by returning a TopicPartitionOffset collection rather than a TopicPartition collection.

All 3 comments

You should use the Assign method rather than the Subscribe method if you want to start at a particular offset on a particular partition. Then, you can call Consume in a loop, checking the offset and stopping when you reach the offset you want to stop at.

also worth noting you can override the start offset explicitly in the handler set with SetPartitionsAssigned by returning a TopicPartitionOffset collection rather than a TopicPartition collection.

Thank you so much your Matt for quick response....I am adding my piece of code how can i add specific offset number only i have add in this plz help me..it could be seek..plz add the lines in the below piece of code..Thanks in advance

CODE

import java.util
import java.util.Properties
import java.util.regex.Pattern

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition

import scala.collection.JavaConverters._

object KafkaConsumerAssign0App {

def main(args: Array[String]): Unit = {

val props = new Properties()
props.put("bootstrap.servers","abc900001.ABC.com.au:9092")
props.put("security.protocol", "SASL_PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "testconsumergroup")
val consumer = new KafkaConsumer(props)

val tp0 = new TopicPartition("topic_ravi,1")
val tp1 = new TopicPartition("topic",1)

val topics = ListTopicPartition
consumer.assign(topics.asJava)
while(true){

val records = consumer.poll(10)
for(record<-records.asScala){
println("Key: "+record.key() +", Value: "+record.value() +", Offset: "+record.offset() )
}
}
consumer.close()// close in finally block
}
}

Was this page helpful?
0 / 5 - 0 ratings

Related issues

alfhv picture alfhv  路  3Comments

maximecaron picture maximecaron  路  3Comments

ietvijay picture ietvijay  路  3Comments

andreas-soroko picture andreas-soroko  路  3Comments

SaMirzaei picture SaMirzaei  路  4Comments