When my code got stuck (dumping, debugging, lack of system resources, bad code...), and as a result of it I do not call Consume method on Consumer longer than max.poll.interval.ms Consumer throws exception with error
Code=Local_MaxPollExceeded
IsBrokerError=false
IsError=true
IsFatal=false
IsLocalError=true
Consumer does not recover, and I need to restart my app (I suppose recreating consumer would also work). From my point of view, shouldn't this error be Fatal? If not why Consumer does not recover?
Consume some message from topic, then break app using Thread.Sleep() for longer interval than max.poll.interval.ms
Please provide the following information:
The consumer shall recover as soon as you start calling Consume() again, which will trigger a rejoin of the group.
I was expecting same behaviour, but after this exception Consumer.Consume keeps returning null.
It will return null (after timeout expires) until the consumer has rejoined the group and received a new assignment, which may take up to max.poll.interval.ms + session.timeout.ms.
Tested right now, and it does not work.
I overrided settings to this:
consumerConfig.SessionTimeoutMs = 20000;
consumerConfig.MaxPollIntervalMs = 30000;
and after first call to consumer.Consume() I called Thread.Sleep(40000); => exception
now I am checking Consume result for more that 4 minutes after exception and consumer.Consume keeps returning null

One thing to note - we do not use consumer groups and assign the start offset manually.
Can you reproduce this with Debug: "cgrp" set and provide us the logs?
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:app]: Group "myGroup": updating member id "(not-set)" -> ""
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:app]: librdkafka v1.3.0 (0x10300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, SSL ZLIB SNAPPY SASL_SCRAM PLUGINS HDRHISTOGRAM, debug 0x100)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state init -> query-coord (v1, join-state init)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v1, join-state init)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changing coordinator -1 -> 3
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" coordinator set to broker 10.1.1.22:9092/3
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v1, join-state init)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state wait-broker-transport
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v1, join-state init)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op ASSIGN (v0) in state up (join state init, v1 vs 0)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": new assignment of 1 partition(s) in join state init
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_assign:2432: new version barrier v2
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": assigning 1 partition(s) in join state init
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state init -> assigned (v2, state up)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v3
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2477)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: List with 1 partition(s):
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: company-events-metadata-991935-637171562028072598 [0] offset 0
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state assigned -> started (v3, state up)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op PARTITION_JOIN in state up (join state started, v3) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": add company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1 Level: Warning Message:[thrd:main]: Application maximum poll interval (30000ms) exceeded by 288ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": leave (in state up)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" is rebalancing in state up (join-state started) with assignment: max.poll.interval.ms exceeded
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state started -> wait-unassign (v3, state up)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_unassign:2345: new version barrier v4
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": unassigning 1 partition(s) (v4)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): unassign
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op PARTITION_LEAVE in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": delete company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state wait-unassign -> init (v4, state up)
Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: LeaveGroup response error in state up: Broker: Unknown member
Is that all the logs there are? Nothing else emitted after that last line?
thats all
Even after waiting 30s or so?
I will give it another try, wait for it :)
No it is all, I waited 7 min, no other info in log.
Okay, thank you, will investigate.
Can you provide your consumer config?
OK after 10 min fer more lines:
Time: 15:03:53 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:app]: Group "myGroup": updating member id "(not-set)" -> ""
Time: 15:03:53 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:app]: librdkafka v1.3.0 (0x10300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, SSL ZLIB SNAPPY SASL_SCRAM PLUGINS HDRHISTOGRAM, debug 0x100)
Time: 15:03:53 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state init -> query-coord (v1, join-state init)
Time: 15:03:53 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Time: 15:03:53 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v1, join-state init)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changing coordinator -1 -> 3
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" coordinator set to broker 10.1.1.22:9092/3
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v1, join-state init)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state wait-broker-transport
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v1, join-state init)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op ASSIGN (v0) in state up (join state init, v1 vs 0)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": new assignment of 1 partition(s) in join state init
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_assign:2432: new version barrier v2
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": assigning 1 partition(s) in join state init
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state init -> assigned (v2, state up)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v3
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2477)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: List with 1 partition(s):
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: company-events-metadata-991935-637171562028072598 [0] offset 0
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state assigned -> started (v3, state up)
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op PARTITION_JOIN in state up (join state started, v3) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:03:54 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": add company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Warning Message:[thrd:main]: Application maximum poll interval (30000ms) exceeded by 312ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": leave (in state up)
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" is rebalancing in state up (join-state started) with assignment: max.poll.interval.ms exceeded
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state started -> wait-unassign (v3, state up)
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_unassign:2345: new version barrier v4
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": unassigning 1 partition(s) (v4)
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): unassign
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op PARTITION_LEAVE in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": delete company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed join state wait-unassign -> init (v4, state up)
Time: 15:04:25 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: LeaveGroup response error in state up: Broker: Unknown member
Time: 15:14:01 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.21:9092/2: Group "myGroup": querying for coordinator: intervaled in state up
Time: 15:14:02 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.21:9092/2: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state up -> query-coord (v4, join-state init)
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v4, join-state init)
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v4, join-state init)
Time: 15:14:31 Handle: rdkafka#consumer-1 Level: Debug Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v4, join-state init)
Client config:
GroupId = "myGroup";
EnableAutoCommit = false;
EnablePartitionEof = true;
MaxPartitionFetchBytes = 50 * 1024 * 1024;
AutoOffsetReset = AutoOffsetReset.Earliest;
SessionTimeoutMs = 20000;
MaxPollIntervalMs = 30000;
Debug = "cgrp";
BootstrapServers = "10.1.1.20:9092,10.1.1.21:9092,10.1.1.22:9092",
TopicMetadataRefreshIntervalMs = 30000,
MessageMaxBytes = 50 * 1024 * 1024,
ReceiveMessageMaxBytes = 2* 50 * 1024 * 1024,
Ah, I think I know what the problem is, you are not using Subscribe(), but Assign()ing partitions directly.
The max.poll.interval.ms should not be enforced unless there is an active group subscription, this is a bug in librdkafka that we'll fix for v1.4.0.
The workaround is to set max.poll.interval.ms to its maximum value.
Thank you for help
Fixed in upcoming v1.4.0
Most helpful comment
Ah, I think I know what the problem is, you are not using Subscribe(), but Assign()ing partitions directly.
The max.poll.interval.ms should not be enforced unless there is an active group subscription, this is a bug in librdkafka that we'll fix for v1.4.0.
The workaround is to set max.poll.interval.ms to its maximum value.