Sometimes I got error:
Unhandled Exception: Confluent.Kafka.KafkaException: 10.244.3.12:9092/1: Disconnected (after 1438771ms in state UP)
at Common.EventBus.KafkaProducer.<.ctor>b__4_1(IProducer2 _, Error errorHandler) at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Debian9.rd_kafka_poll(IntPtr rk, IntPtr timeout_ms) at Confluent.Kafka.Producer2.<>c__DisplayClass24_0.b__0()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
ctor of my KafkaProducer class
public KafkaProducer(EventBusOptions options, ILogger<KafkaProducer> logger)
{
_options = options ?? throw new ArgumentException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var producerConfig = new ProducerConfig
{
BootstrapServers = options.ConnectionString,
EnableIdempotence = true,
CompressionType = CompressionType.Snappy
};
_producer = new ProducerBuilder<string, string>(producerConfig)
.SetLogHandler((_, logMessage) => _logger.LogInformation(logMessage.Message))
.SetErrorHandler((_, errorHandler) =>
{
_logger.LogError(errorHandler.Reason);
throw new KafkaException(errorHandler);
})
.Build();
}
After sending message like
public async Task Send(IntegrationEvent eventData)
{
var jsonMessage = JsonConvert.SerializeObject(eventData, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
var message = new Message<string, string>
{
Value = jsonMessage,
Headers = new Headers
{
{"eventType", Encoding.UTF8.GetBytes(eventData.EventType.ToLowerInvariant()) }
}
};
_producer.Produce(topicName, message, report =>
{
if (report.Error.IsError)
{
_logger.LogError(report.Error.Reason);
}
else
{
_logger.LogInformation("Event message '{EventType}' with Id '{MessageId}' sent to topic '{TopicName}'", eventData.EventType, eventData.Id.ToString(), report.TopicPartitionOffset);
}
});
}
Log
Mar 4, 2020 @ 12:07:14.214 | Event message '"con_integration"' with Id '"c846365a-dcc7-4a48-b6f3-5116aeb72b96"' sent to topic '"dev_con_integration [[11]] @164"'
-- | --
Mar 4, 2020 @ 12:17:14.439 | 10.244.2.165:9092/0: Disconnected (after 992313ms in state UP)
-- | --
error notifications via SetErrorHandler are informational unless the IsFatal property is true (in which case you'll be notified via other exceptions anyway). you can just log them and only need to add logic to handle specific ones if you have specific need (unusual).
thanks for quick response
do you mean that this error is not Fatal and I can ignore it?
.SetErrorHandler((_, errorHandler) =>
{
_logger.LogError(errorHandler.Reason);
// throw new KafkaException(errorHandler);
})
Will a producer work after this error? I can't reproduce it, so it happens sometimes.
it's not fatal unless errorHandler.Fatal (that variable would better be called errorbtw) is true. Currently that can only happen in very rare cases when you're using idempotent producer.
if you do get a fatal error, every call to every method will also give you an exception. if you don't it's fine to keep using the producer. the name 'error' here is unfortunate and historical reasons - it's confusing, yes.
I can't check if it is Fatal because it is hard to reproduce. I thought, you know if Disconnected (after 992313ms in state UP) is fatal or not.
I create producer instance as a singleton
that error is not fatal.
so, I can send message after this error, right?
you should pretty much never get a fatal error, and everything will blow up if you do - the producer will be unusable.
thank you very much
Most helpful comment
that error is not fatal.