I am running the following docker images confluent/kafka on two brokers along with confluent/zookeeper. I have a producer that is using the confluent-kafka-dotnet nuget package. I also have a few consumers that are also using the same nuget package. After about 5-10 minutes, probably closer to 15, I will start to see this errors:
%3|1558619115.036|ERROR|rdkafka#producer-23| [thrd:kafka1:9092/bootstrap]: 2/2 brokers are down
%3|1558619115.161|ERROR|rdkafka#producer-25| [thrd:kafka:9092/bootstrap]: 2/2 brokers are down
%3|1558619115.409|ERROR|rdkafka#producer-26| [thrd:kafka:9092/bootstrap]: 2/2 brokers are down
However, I can still produce to my topics and also consume my topics while this spamming my console windows.
namespace Producer
{
class Program
{
public static void Main(string[] args)
{
var conf = new ProducerConfig { BootstrapServers = "kafka:9092,kafka1:9092" };
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
for (int i=0; i<100; ++i)
{
p.Produce("my-topic", new Message<Null, string> { Value = "yo" + i }, handler);
}
// wait for up to 10 seconds for any inflight messages to be delivered.
p.Flush(TimeSpan.FromSeconds(10));
}
}
}
}
namespace Consumer
{
class Program
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "kafka:9092,kafka1:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("my-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
}
1) Spin up two kafka brokers (this also happens on any n number of kafka brokers)
2) Spin up a producer
3) Spin up a few consumers (i.e 2)
4) Begin producing topics. I am using a webapi that is acting as a producer so I am posting data to it and then it produces it out. My consumers are saving to a db and elastic search but they are also console logging what was consumed.
5) Go idle for about 15 minutes, closer to 20, though this maybe less for you.
6) You should see a spamming of logs like the one I posted.
Please provide the following information:
I would guess the 'error' is due to the broker closing connections after 10 minutes because they've been idle. You'll see this error if all connections to all brokers are closed. You can safely ignore it - it's not an error as you point out - the client will open the connections again as it needs them.
By default librdkafka writes log messages to stderr. you can change that behavior by setting a log handler.
Note: it looks like you are maintaining many producer instances - you most likely don't want to do this.
There are a number of things about errors/logging that I don't like (and trying to get improved), including elements of what you point out. There's more to this / it's more complicated than it would first seem however.
@mhowlett What do you mean about maintaining many producer instances? We were looking at one producer per microservice instance.
Other than that, that sounds good.
you seem to have lots of producers:
rdkafka#producer-23
rdkafka#producer-24
rdkafka#producer-25
@mhowlett Interesting enough.
I only create one producer in the ctor of my service, though it is in IOC as a transient service i.e new service for every request.
Should I have them as scoped or a singleton?
singleton - what you're doing now is resulting in lots of producer instances that aren't getting disposed. it's very expensive to create one (and conversely, one producer instance can handle 100k's requests per second).
Most helpful comment
singleton - what you're doing now is resulting in lots of producer instances that aren't getting disposed. it's very expensive to create one (and conversely, one producer instance can handle 100k's requests per second).