Confluent-kafka-dotnet: Consumer error Local_MaxPollExceeded is Fatal for consumer

Created on 20 Mar 2020  路  17Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

When my code got stuck (dumping, debugging, lack of system resources, bad code...), and as a result of it I do not call Consume method on Consumer longer than max.poll.interval.ms Consumer throws exception with error
Code=Local_MaxPollExceeded
IsBrokerError=false
IsError=true
IsFatal=false
IsLocalError=true

Consumer does not recover, and I need to restart my app (I suppose recreating consumer would also work). From my point of view, shouldn't this error be Fatal? If not why Consumer does not recover?

How to reproduce

Consume some message from topic, then break app using Thread.Sleep() for longer interval than max.poll.interval.ms

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [x] Confluent.Kafka nuget version. 1.3.0
  • [x] Apache Kafka version. kafka_2.12-2.2.0
  • [x] Operating system. - windows/linux
bug librdkafka

Most helpful comment

Ah, I think I know what the problem is, you are not using Subscribe(), but Assign()ing partitions directly.
The max.poll.interval.ms should not be enforced unless there is an active group subscription, this is a bug in librdkafka that we'll fix for v1.4.0.
The workaround is to set max.poll.interval.ms to its maximum value.

All 17 comments

The consumer shall recover as soon as you start calling Consume() again, which will trigger a rejoin of the group.

I was expecting same behaviour, but after this exception Consumer.Consume keeps returning null.

It will return null (after timeout expires) until the consumer has rejoined the group and received a new assignment, which may take up to max.poll.interval.ms + session.timeout.ms.

Tested right now, and it does not work.

I overrided settings to this:
consumerConfig.SessionTimeoutMs = 20000; consumerConfig.MaxPollIntervalMs = 30000;
and after first call to consumer.Consume() I called Thread.Sleep(40000); => exception

now I am checking Consume result for more that 4 minutes after exception and consumer.Consume keeps returning null

image

One thing to note - we do not use consumer groups and assign the start offset manually.

Can you reproduce this with Debug: "cgrp" set and provide us the logs?

Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:app]: Group "myGroup": updating member id "(not-set)" -> ""
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:app]: librdkafka v1.3.0 (0x10300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, SSL ZLIB SNAPPY SASL_SCRAM PLUGINS HDRHISTOGRAM, debug 0x100)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state init -> query-coord (v1, join-state init)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v1, join-state init)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changing coordinator -1 -> 3
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" coordinator set to broker 10.1.1.22:9092/3
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v1, join-state init)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state wait-broker-transport
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v1, join-state init)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op ASSIGN (v0) in state up (join state init, v1 vs 0)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": new assignment of 1 partition(s) in join state init
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_assign:2432: new version barrier v2
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": assigning 1 partition(s) in join state init
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state init -> assigned (v2, state up)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v3
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2477)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: List with 1 partition(s):
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]:  company-events-metadata-991935-637171562028072598 [0] offset 0
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state assigned -> started (v3, state up)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op PARTITION_JOIN in state up (join state started, v3) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": add company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1  Level: Warning  Message:[thrd:main]: Application maximum poll interval (30000ms) exceeded by 288ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": leave (in state up)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" is rebalancing in state up (join-state started) with assignment: max.poll.interval.ms exceeded
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state started -> wait-unassign (v3, state up)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_unassign:2345: new version barrier v4
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": unassigning 1 partition(s) (v4)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): unassign
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op PARTITION_LEAVE in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": delete company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state wait-unassign -> init (v4, state up)
Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: LeaveGroup response error in state up: Broker: Unknown member

Is that all the logs there are? Nothing else emitted after that last line?

thats all

Even after waiting 30s or so?

I will give it another try, wait for it :)

No it is all, I waited 7 min, no other info in log.

Okay, thank you, will investigate.
Can you provide your consumer config?

OK after 10 min fer more lines:

Time: 15:03:53  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:app]: Group "myGroup": updating member id "(not-set)" -> ""
Time: 15:03:53  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:app]: librdkafka v1.3.0 (0x10300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, SSL ZLIB SNAPPY SASL_SCRAM PLUGINS HDRHISTOGRAM, debug 0x100)
Time: 15:03:53  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state init -> query-coord (v1, join-state init)
Time: 15:03:53  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Time: 15:03:53  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v1, join-state init)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changing coordinator -1 -> 3
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" coordinator set to broker 10.1.1.22:9092/3
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v1, join-state init)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state wait-broker-transport
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v1, join-state init)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op ASSIGN (v0) in state up (join state init, v1 vs 0)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": new assignment of 1 partition(s) in join state init
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_assign:2432: new version barrier v2
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": assigning 1 partition(s) in join state init
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state init -> assigned (v2, state up)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v3
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2477)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: List with 1 partition(s):
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]:  company-events-metadata-991935-637171562028072598 [0] offset 0
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state assigned -> started (v3, state up)
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op PARTITION_JOIN in state up (join state started, v3) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:03:54  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": add company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Warning  Message:[thrd:main]: Application maximum poll interval (30000ms) exceeded by 312ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": leave (in state up)
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" is rebalancing in state up (join-state started) with assignment: max.poll.interval.ms exceeded
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state started -> wait-unassign (v3, state up)
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": rd_kafka_cgrp_unassign:2345: new version barrier v4
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": unassigning 1 partition(s) (v4)
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): unassign
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op PARTITION_LEAVE in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": delete company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v4) for company-events-metadata-991935-637171562028072598 [0]
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed join state wait-unassign -> init (v4, state up)
Time: 15:04:25  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: LeaveGroup response error in state up: Broker: Unknown member
Time: 15:14:01  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.21:9092/2: Group "myGroup": querying for coordinator: intervaled in state up
Time: 15:14:02  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.21:9092/2: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state up -> query-coord (v4, join-state init)
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup": querying for coordinator: intervaled in state query-coord
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state query-coord -> wait-coord (v4, join-state init)
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: 10.1.1.20:9092/1: Group "myGroup" coordinator is 10.1.1.22:9092 id 3
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-coord -> wait-broker-transport (v4, join-state init)
Time: 15:14:31  Handle: rdkafka#consumer-1  Level: Debug    Message:[thrd:main]: Group "myGroup" changed state wait-broker-transport -> up (v4, join-state init)

Client config:

GroupId = "myGroup";
EnableAutoCommit = false;
EnablePartitionEof = true;
MaxPartitionFetchBytes = 50 * 1024 * 1024;
AutoOffsetReset = AutoOffsetReset.Earliest;
SessionTimeoutMs = 20000;
MaxPollIntervalMs = 30000;
Debug = "cgrp";
BootstrapServers = "10.1.1.20:9092,10.1.1.21:9092,10.1.1.22:9092",
TopicMetadataRefreshIntervalMs = 30000,
MessageMaxBytes = 50 * 1024 * 1024,
ReceiveMessageMaxBytes = 2* 50 * 1024 * 1024,

Ah, I think I know what the problem is, you are not using Subscribe(), but Assign()ing partitions directly.
The max.poll.interval.ms should not be enforced unless there is an active group subscription, this is a bug in librdkafka that we'll fix for v1.4.0.
The workaround is to set max.poll.interval.ms to its maximum value.

Thank you for help

Fixed in upcoming v1.4.0

Was this page helpful?
0 / 5 - 0 ratings

Related issues

maximecaron picture maximecaron  路  3Comments

ThomasHjorslevFcn picture ThomasHjorslevFcn  路  3Comments

Eibwen picture Eibwen  路  3Comments

zoeysaurusrex picture zoeysaurusrex  路  4Comments

vinodres picture vinodres  路  4Comments