I have implemented code to re-consume message when there is kafka excetpion. I saved TopicPartitionOffset to a local variable. And there is delay retry mechanism which put thread on sleep for some time before next re-consume.
I have tried to use Seek(topicPartitionOffset); It works for most of the time, but got error - "Local: Erroneous state" sometimes; I have noticed if I put thread on sleep for more than 5 mins (this probably default setting on broker), the Assignment list become empty and MemberId become empty, then system throw error - "Local: Erroneous state"; It seems like connection to broker was closed, I tried to call Assign, or Resume, none of them works.
I tried to recreate consumer (with build) on the fly and assign saved "TopicPartitionOffset" to it. It seems working. I am not sure if it is the correct way.
I am wondering what the correct way to achieve my goal? I have tried Assign(TopicPartitionOffset) before Seek, but it failed with same error. The Assignment list is still empty, even I put thread on sleep for 20 secs after Assign.
Create consumer, then pause on debug, after few mins consumer's Assignment list become empty and MemberId become empty, then continue on Seek(topicPartitionOffset), throw error - "Local: Erroneous state"
Please provide the following information:
it looks like what you are facing is the issue where the consumer max poll interval is exceeded and the broker considers the consumer out of the consuming group.
We have the same issue as well. We need to Seek to the partition offset but not sure how to handle that with delay. The delay will cause the issue above. Poll needs to be called frequently or broker considers your consumer out of the group and no Seek, nor Assign call will work.
thanks @saherahwal !
BTW, if we want to reconnect consumer to broker, what is the correct way to do it? Should I unsubscribe / subscribe, or re-build consumer, or some other ways?
I'd love to be able to answer your question. We have the same question too.
In my understanding, to keep the consumer instance alive you need to keep calling poll(), or Consume() call which internally polls in .NET. This could mean you move processing to another thread and keep polling in consumer thread. However, this complicates your commits and you risk losing messages in failure.
On the other hand you can publish failed messages to retry topics similar to what is described here and have separate consumers consuming from these topics - https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
That said, these relate to processing exception where the message arrived but the processing faulted / or an external service used in processing is down (e.g MySQL / Mongo DB if used to enrich messages ...etc). But what happens if the exception is due to Kafka connectivity or brokers down...etc? That's where I am trying to find out the best approach.
I like the JavaDoc https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html here under Detecting Consumer Failures header it explains: "For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned."
I hope that discussion helps in your case. But I do have some open questions I am hoping someone from Confluent can chime in.
BTW, if we want to reconnect consumer to broker, what is the correct way to do it? Should I unsubscribe / subscribe, or re-build consumer, or some other ways?
it's all automatic. once you begin polling again, the consumer will rejoin the group and get assigned new partitions. you may not be able to commit offsets though, because the consumer may not own the partition you were processing a message for. note that you can increase max.poll.interval.ms to be larger than the maximum processing time.
But what happens if the exception is due to Kafka connectivity or brokers down...etc?
you can generally just ignore/log the error and keep polling.
you generally shouldn't need to seek (which btw currently flushes the client side message cache). if a rebalance happens, new messages will be delivered from the last committed offset.
@mhowlett thanks for your answer - much appreciated.
We noticed that in case Schema Registry is down, we get a ConsumeException upon Consume() call and that Exception has a valid ConsumeRecord with valid topic, partition and offset. If we ignore and poll in that case, we keep getting the next messages and we loose messages if we don't Seek.
That said, other errors from Consume causing ConsumeException do not have a valid Topic,Partition and Offset in the ConsumeRecord so Seek will fail and is not necessary.
We currently detect Schema Registry being down as follows successfully. Is what we're doing correct?
I wish there was seperate exception to catch when schema registry is down vs other exceptions where Seek is not needed. But perhaps you can explain more if Seek is also not needed in that case too.
public bool IsSchemaRegistryDown(ConsumeException consumeException)
{
var errorCode = consumeException.Error.Code;
//
// if the error code is deserialization error (key or value)
// AND the inner exception is HttpRequestException
// then Scheme registry is down
//
if(errorCode == ErrorCode.Local_KeyDeserialization ||
errorCode == ErrorCode.Local_ValueDeserialization)
{
if (consumeException != null && consumeException.InnerException != null)
{
if (consumeException.InnerException is HttpRequestException)
{
_logger.LogError(consumeException.InnerException, "SchemaRegistry is down.");
return true;
}
}
}
return false;
}
Thanks for your time,
Saher
Most helpful comment
it's all automatic. once you begin polling again, the consumer will rejoin the group and get assigned new partitions. you may not be able to commit offsets though, because the consumer may not own the partition you were processing a message for. note that you can increase max.poll.interval.ms to be larger than the maximum processing time.
you can generally just ignore/log the error and keep polling.
you generally shouldn't need to seek (which btw currently flushes the client side message cache). if a rebalance happens, new messages will be delivered from the last committed offset.