We've been getting this error since upgrading to beta3:
Subscribe :: Consume exception: Local: Maximum application poll interval (max.poll.interval.ms) exceeded - Local: Maximum application poll interval (max.poll.interval.ms) exceeded (ConsumeException) - Confluent.Kafka.ConsumeException: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
at Confluent.Kafka.Consumer`2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
at Wayfair.Common.MessageQueue.Kafka.KafkaSubscriberChannel`2.Subscribe(IEnumerable`1 targetQueues, Action`2 messageHandler, ISubscriberEventHandler eventHandler).
We kept upping the max.poll.interval value to try to determine if we have a process that is taking way too long. Currently the value is set to 30 minutes. (This seems far longer than any process we would have running).
We are mostly out of ideas and wanted to see if there was a recommendation for how to solve this issue?
Use the config specified below. After a fairly long period of time, the error will occur. I'm not sure exactly how long, but in the past, I've seen within an hour. Other times I am guessing a few hours to show up.
Please provide the following information:
v1.0-beta3new ConsumerConfig {
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
HeartbeatIntervalMs = 3000,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = AutoOffsetReset.Earliest,
MaxPollIntervalMs = (int?) TimeSpan.FromMinutes(30).TotalMilliseconds
}
And...
new ConsumerBuilder<TKey, TMessage>(config)
.SetKeyDeserializer(keyDeserializer)
.SetValueDeserializer(valueDeserializer)
.SetErrorHandler(eventHandler.OnError)
.SetLogHandler(eventHandler.OnLog)
.SetOffsetsCommittedHandler(eventHandler.OnOffsetsCommitted)
.SetStatisticsHandler(eventHandler.OnStatistics)
.SetRebalanceHandler((c, e) =>
{
if (e.IsAssignment)
{
c.Assign(e.Partitions);
eventHandler.OnPartitionsAssigned(c, e);
}
else
{
c.Unassign();
eventHandler.OnPartitionsRevoked(c, e);
}
})
.Build())
@mhowlett The CONSUMER_ERR error string, which contains more information, doesn't seem to be included in the exception output, can you verify?
@edenhill - yes, it's just using a generic error message (marking as enhancement).
I've not seen this problem before, so my first guess based on the information provided is that it's likely at the application level. Have you put in logging to confirm the frequency at which Consume is being called?
Not yet--we are finishing up a project that will give us insights into the frequency of message consumption by consumer. Once we get this out there, I'll keep an eye on that and see what the numbers say. I'll keep you posted in a few days.
I am still seeing this issue. I am using RC2.
Consumer config:
HeartbeatIntervalMs = 3000 (default)
SessionTimeoutMs = 10000 (default)
SocketKeepaliveEnable = true
Acks = Acks.Leader
EnableAutoCommit = true
EnableAutoOffsetStore = false
AutoCommitIntervalMs = 5000
MaxPollIntervalMs = 1800000 (30 minutes)
interval between consecutive poll call is 6 minutes or less.
It seems like MaxPollIntervalMs override is being ignored. If interval between two consecutive poll calls is > 5 minutes, I see above exception
@mhowlett May want to add a unit test that verifies that the .NET configuration is really applied on an rd_kafka_conf_t, using rd_kafka_conf_dump()
that's not easy to do without exposing rd_kafka_conf_dump() in the API (which there may be value in doing, but it's not implemented).
seems unlikely to me that .net isn't setting this - not much that can go wrong.
have you tried RC6?
thanks for responding. i haven鈥檛 tried RC6. I will try it. is it possible to include actual interval value in error message instead of config key? may be some like this: Maximum application poll interval (max.poll.interval.ms=xxxxx) exceeded.
@mhowlett I am running into a similar issue. I have just upgraded the Confluent.Kafka to v 1.1.0.
Here is the related log message
Application maximum poll interval (300000ms) exceeded by 375ms (adjust max.poll.interval.ms for long-running message processing): leaving group
My question is, what is the best way to recover from this situation from within the code without recycling the windows service in which the consumer is running.
Some messages are going to take longer to process and instead of adjusting max.poll.interval.ms, is there a way to force the consumer to reconnect when this issue occurs? Is there a way to detect this and then recover from it?
any news about this issue ?
we aren't aware of any issues related to max.poll.interval.ms in v1.4.0.
@mhowlett do you mean that it doesn't throw this error when consumer is paused before long message processing and is resumed afterwards for the next message consumption in v1.4.0?
The application must call poll/consume even if it has paused its assigned partitions.
@edenhill - do you mean that pausing all assigned partitions would not help with this error?
@mhowlett - does this mean there is no way to avoid this error if we absolutely need to have a long running message processing?
The point of max.poll.interval.ms is to provide a heartbeat between the application and the consumer: if the application has not called poll/consume (heartbeated) in this long the application is deemed dead/stalled/stuck/malfunctional and the consumer will leave the group so the assigned partitions can be assigned to a live application instance.
max.poll.interval.ms should thus be set to the maximum (plus some) theoretical processing time.
Most helpful comment
@mhowlett I am running into a similar issue. I have just upgraded the Confluent.Kafka to v 1.1.0.
Here is the related log message
My question is, what is the best way to recover from this situation from within the code without recycling the windows service in which the consumer is running.
Some messages are going to take longer to process and instead of adjusting max.poll.interval.ms, is there a way to force the consumer to reconnect when this issue occurs? Is there a way to detect this and then recover from it?