My producer and consumer is a singleton as recommended here
Then, how to restart producer/consumer ( if it is singleton) after errors like
Local: Message timed out
Sometimes, after that error other messages won't send. I think, the client should open the connections again, but it doesn't.
Are there any suggestions/tips what error should I handle, what errors are automatically retry and when I should restart producer/consumer?
My app works in container, so I can just throw to restart container. When I should throw?
Please help with proper error handling. Let me know your opinion.
My producer coder:
var producerConfig = new ProducerConfig
{
BootstrapServers = options.ConnectionString,
EnableIdempotence = true,
Acks = Acks.All,
CompressionType = CompressionType.Snappy
};
_producer = new ProducerBuilder<string, string>(producerConfig)
.SetLogHandler((_, logMessage) => _logger.LogInformation("Kafka log: {Message}", logMessage.Message))
.SetErrorHandler((_, error) =>
{
if (error.IsFatal)
{
_logger.LogError("Kafka fatal error: {Reason}", error.Reason);
}
_logger.LogWarning("Kafka error: {Reason}", error.Reason);
})
.Build();
_producer.Produce(topicName, message, report =>
{
if (report.Error.Code != ErrorCode.NoError)
{
_logger.LogError("Failed to send event message '{EventType}' to topic '{TopicName}'. Reason: {Reason}", eventData.EventType, topicName, report.Error.ToString());
}
});
and consumer
var consumerConfig = new ConsumerConfig
{
GroupId = __options.ConsumerName,
BootstrapServers = _options.ConnectionString,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetLogHandler((_, logMessage) => _logger.LogInformation("Kafka log: {Message}", logMessage.Message))
.SetErrorHandler((_, error) =>
{
if (error.IsFatal)
{
_logger.LogError("Kafka fatal error: {Reason}", error.Reason);
throw new KafkaException(error);
}
_logger.LogWarning("Kafka error: {Reason}", error.Reason);
})
.SetPartitionsAssignedHandler((c, partitions) => _logger.LogInformation("Kafka log: Assigned partitions: [{Partitions}]", string.Join(", ", partitions)))
.Build();
public void Consume()
{
// provides asynchronous message processing for different topics, but synchronous within each topic
var pipeline = new Dictionary<string, ActionBlock<KafkaEventMessage>>();
foreach (var subscription in _consumer.Subscription)
{
pipeline.Add(subscription, new ActionBlock<KafkaEventMessage>(async msg =>
{
await ProcessEvent(msg);
_consumer.StoreOffset(msg.ConsumeResult);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}
try
{
// start Poll Loop
while (true)
{
KafkaEventMessage eventMsg = _consumer.Consume().ToEventMessage();
_logger.LogDebug("Event {EventType} received from {TopicName}. Payload: {Payload}", eventMsg.EventType, eventMsg.TopicPartitionOffset, eventMsg.Value);
pipeline[eventMsg.Topic].SendAsync(eventMsg).Wait();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Kafka consumer will be restarted due to non-retryable exception: {Message}", ex.Message);
throw;
}
}
Please provide the following information:
generally, the producer should recover from all errors, except where marked fatal.
marking as enhancement as we need to write more about error handling.
I get very often
Disconnected (after 5990635ms in state UP)
what about this error?
that's the broker killing the client connection because it's been inactive for 10 minutes. the client automatically reconnects if required.
Most helpful comment
that's the broker killing the client connection because it's been inactive for 10 minutes. the client automatically reconnects if required.