Confluent-kafka-dotnet: Consumer not reconecting on Connection Closed

Created on 18 Jan 2018  路  11Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I'm getting a connection error from kafka in my consumer, this happens from time to time, the problem is that after the connection error the client doesnt re-open a connection and I start to get lag, is there some configuration I'm missing? or why is my connection not being re-established?

This is my code:

        var config = new Dictionary<string, object>
        {
            { "group.id", groupId },
            { "bootstrap.servers", brokerList },
            { "queued.max.messages.kbytes", 100000 },
            { "statistics.interval.ms", 10000 },
            { "enable.auto.commit", false },
            { "default.topic.config", new Dictionary<string, object>()
                {
                    { "auto.offset.reset", "smallest" }
                }
            }
        };

        this.consumer = new ConsumerAdapter<Null, string>(config, null, new StringDeserializer(Encoding.UTF8));

        this.consumer.OnPartitionEOF += (_, end) =>
            log.Info($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

        this.consumer.OnError += (_, error) =>
            log.Error($"Error: {error}");

        this.consumer.OnPartitionsAssigned += (_, partitions) =>
        {
            log.Info($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
            this.consumer.Assign(partitions);
        };

        this.consumer.OnPartitionsRevoked += (_, partitions) =>
        {
            log.Info($"Revoked partitions: [{string.Join(", ", partitions)}]");
            consumer.Unassign();
        };

        this.consumer.Subscribe(topics);


        ...


        while (consumer.Consume(out message, TimeSpan.FromSeconds(secondsTimeout)))
        {
                T value = JsonConvert.DeserializeObject<T>(message.Value);
                values.Add(value);
        }

Whe are using kafka 2.12 and we are running in an azure worker role.

thanks

All 11 comments

Do you have any client logs from such an occasion?
What confluent-kafka-dotnet and librdkafka.redist versions are you using?

This is the log I'm getting:
Error: 10.1.0.12:9092/3: Connection closed

I'm using confluent-kafka-dotnet V0.11.3 and librdkafka.redist V0.11.3

I'm also getting timeouts:
Error: 10.1.0.11:9092/2: 1 request(s) timed out: disconnect (average rtt 15891.000ms)

The problem is that it doesn't open the connection again, and I'm getting a great lag and I have to restart my service to get messages.

Looks like you might have connectivity issues.
Can you reproduce the problem with the "debug" config property set to "broker,cgrp,topic" and provide logs from where it goes down and doesn't come up again?

Thanks I changed the configuration and I'm waiting for the error to appear again

Checking the logs something weird happened the last log was at 2018-01-19T12:29:47.030Z and the last three messages where:

Reached end of topic queuing.location.deviceEvents partition 1, next message will be at offset 130008646
Reached end of topic queuing.location.deviceEvents partition 2, next message will be at offset 127016010
Reached end of topic queuing.location.deviceEvents partition 0, next message will be at offset 126081494

Since then the offsets haven't changed in the kafka manager. It tells the offsets are like this:

0 | 126094128
1 | 130021261
2 | 127022904

There is no error, there was a connection error at 2018-01-19T12:29:46.723Z but you can see the offsets changing after that.

Error: 10.1.0.12:9092/3: Connect to ipv4#10.1.0.12:9092 failed: No connection could be made because the target machine actively refused it...

I get this "Reached end of topic" logs in blocks of 1000 every 4 to 5 minutes.

I have to add that messages where arriving all time, so it should have been processing new messages

Hi, the problem was with kafka configuration, it's solved now. Thanks

What was correct configuration?

@afrancoc2000 Hi, could you share the correct kafka configuration?

@afrancoc2000 - I am seeing a similar issue as well. Can you share the correct configuration?

Sorry guys, someone in infrastructure solved the problem, I asked him to post it here but he doesn't remember what he did :(

Was this page helpful?
0 / 5 - 0 ratings