Confluent-kafka-dotnet: Error Message: Misleading since brokers are not down

Created on 23 May 2019  路  5Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I am running the following docker images confluent/kafka on two brokers along with confluent/zookeeper. I have a producer that is using the confluent-kafka-dotnet nuget package. I also have a few consumers that are also using the same nuget package. After about 5-10 minutes, probably closer to 15, I will start to see this errors:

%3|1558619115.036|ERROR|rdkafka#producer-23| [thrd:kafka1:9092/bootstrap]: 2/2 brokers are down
%3|1558619115.161|ERROR|rdkafka#producer-25| [thrd:kafka:9092/bootstrap]: 2/2 brokers are down
%3|1558619115.409|ERROR|rdkafka#producer-26| [thrd:kafka:9092/bootstrap]: 2/2 brokers are down

However, I can still produce to my topics and also consume my topics while this spamming my console windows.

Minimal Program

Producer Example

namespace Producer
{
    class Program
    {
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig { BootstrapServers = "kafka:9092,kafka1:9092" };

        Action<DeliveryReport<Null, string>> handler = r => 
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i=0; i<100; ++i)
            {
                p.Produce("my-topic", new Message<Null, string> { Value = "yo" + i }, handler);
            }

            // wait for up to 10 seconds for any inflight messages to be delivered.
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
    }
}

Consumer Program

namespace Consumer
{
    class Program
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group",
                BootstrapServers = "kafka:9092,kafka1:9092",
                // Note: The AutoOffsetReset property determines the start offset in the event
                // there are not yet any committed offsets for the consumer group for the
                // topic/partitions of interest. By default, offsets are committed
                // automatically, so in this example, consumption will only start from the
                // earliest message in the topic 'my-topic' the first time you run the program.
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("my-topic");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }
        }
    }
}

Some system information

  • I am running the 1.0.0 version of confluent-kafka-dotnet.
  • My host machine is windows 10 but I am using the .NET Core 2.2 docker image for my producers and consumers, respectively.
  • Client configuration: .NET Core 2.2 WebAPI project. But I have also seen this happen in the sample minimal program I have provided.
  • Apache kafka version: I am using confluentinc/cp-kafka latest version but, I am unaware of the exact kafka version.
  • Broker Logs: No logs that point to the fact that they are down, even zookeeper is not aware of them being down.

How to reproduce

1) Spin up two kafka brokers (this also happens on any n number of kafka brokers)
2) Spin up a producer
3) Spin up a few consumers (i.e 2)
4) Begin producing topics. I am using a webapi that is acting as a producer so I am posting data to it and then it produces it out. My consumers are saving to a db and elastic search but they are also console logging what was consumed.
5) Go idle for about 15 minutes, closer to 20, though this maybe less for you.
6) You should see a spamming of logs like the one I posted.

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [x] Confluent.Kafka nuget version.
  • [x] Apache Kafka version.
  • [x] Client configuration.
  • [x] Operating system.
  • [x] Provide logs (with "debug" : "..." as necessary in configuration).
  • [] Provide broker log excerpts.
  • [x] Critical issue.
enhancement

Most helpful comment

singleton - what you're doing now is resulting in lots of producer instances that aren't getting disposed. it's very expensive to create one (and conversely, one producer instance can handle 100k's requests per second).

All 5 comments

I would guess the 'error' is due to the broker closing connections after 10 minutes because they've been idle. You'll see this error if all connections to all brokers are closed. You can safely ignore it - it's not an error as you point out - the client will open the connections again as it needs them.

By default librdkafka writes log messages to stderr. you can change that behavior by setting a log handler.

Note: it looks like you are maintaining many producer instances - you most likely don't want to do this.

There are a number of things about errors/logging that I don't like (and trying to get improved), including elements of what you point out. There's more to this / it's more complicated than it would first seem however.

@mhowlett What do you mean about maintaining many producer instances? We were looking at one producer per microservice instance.

Other than that, that sounds good.

you seem to have lots of producers:

rdkafka#producer-23
rdkafka#producer-24
rdkafka#producer-25

@mhowlett Interesting enough.

I only create one producer in the ctor of my service, though it is in IOC as a transient service i.e new service for every request.

Should I have them as scoped or a singleton?

singleton - what you're doing now is resulting in lots of producer instances that aren't getting disposed. it's very expensive to create one (and conversely, one producer instance can handle 100k's requests per second).

Was this page helpful?
0 / 5 - 0 ratings

Related issues

MihaiComan87 picture MihaiComan87  路  3Comments

michael-huxtable picture michael-huxtable  路  4Comments

alfhv picture alfhv  路  3Comments

Eibwen picture Eibwen  路  3Comments

ietvijay picture ietvijay  路  3Comments