Confluent-kafka-dotnet: How and when to restart producer/consumer after errors like "Local: Message timed out"

Created on 12 Mar 2020  路  4Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

My producer and consumer is a singleton as recommended here

Then, how to restart producer/consumer ( if it is singleton) after errors like

Local: Message timed out

Sometimes, after that error other messages won't send. I think, the client should open the connections again, but it doesn't.

Are there any suggestions/tips what error should I handle, what errors are automatically retry and when I should restart producer/consumer?

My app works in container, so I can just throw to restart container. When I should throw?

How to reproduce

Please help with proper error handling. Let me know your opinion.

My producer coder:

var producerConfig = new ProducerConfig
{
    BootstrapServers = options.ConnectionString,
    EnableIdempotence = true,
    Acks = Acks.All,
    CompressionType = CompressionType.Snappy
};

_producer = new ProducerBuilder<string, string>(producerConfig)
    .SetLogHandler((_, logMessage) => _logger.LogInformation("Kafka log: {Message}", logMessage.Message))
    .SetErrorHandler((_, error) =>
    {
        if (error.IsFatal)
        {
            _logger.LogError("Kafka fatal error: {Reason}", error.Reason);
        }
        _logger.LogWarning("Kafka error: {Reason}", error.Reason);
    })
    .Build();

_producer.Produce(topicName, message, report =>
{
    if (report.Error.Code != ErrorCode.NoError)
    {
        _logger.LogError("Failed to send event message '{EventType}' to topic '{TopicName}'. Reason: {Reason}", eventData.EventType, topicName, report.Error.ToString());
    }
});

and consumer

var consumerConfig = new ConsumerConfig
{
    GroupId = __options.ConsumerName,
    BootstrapServers = _options.ConnectionString,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
    AutoOffsetReset = AutoOffsetReset.Earliest
};

_consumer = new ConsumerBuilder<string, string>(consumerConfig)
    .SetLogHandler((_, logMessage) => _logger.LogInformation("Kafka log: {Message}", logMessage.Message))
    .SetErrorHandler((_, error) =>
    {
        if (error.IsFatal)
        {
            _logger.LogError("Kafka fatal error: {Reason}", error.Reason);
            throw new KafkaException(error);
        }
        _logger.LogWarning("Kafka error: {Reason}", error.Reason);
    })
    .SetPartitionsAssignedHandler((c, partitions) => _logger.LogInformation("Kafka log: Assigned partitions: [{Partitions}]", string.Join(", ", partitions)))
    .Build();

public void Consume()
{
    // provides asynchronous message processing for different topics, but synchronous within each topic
    var pipeline = new Dictionary<string, ActionBlock<KafkaEventMessage>>();
    foreach (var subscription in _consumer.Subscription)
    {
        pipeline.Add(subscription, new ActionBlock<KafkaEventMessage>(async msg =>
        {
            await ProcessEvent(msg);
            _consumer.StoreOffset(msg.ConsumeResult);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
    }

    try
    {
        // start Poll Loop
        while (true)
        {
            KafkaEventMessage eventMsg = _consumer.Consume().ToEventMessage();
            _logger.LogDebug("Event {EventType} received from {TopicName}. Payload: {Payload}", eventMsg.EventType,  eventMsg.TopicPartitionOffset, eventMsg.Value);
            pipeline[eventMsg.Topic].SendAsync(eventMsg).Wait();
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Kafka consumer will be restarted due to non-retryable exception: {Message}", ex.Message);
        throw;
    }
}

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [x] Confluent.Kafka nuget version. 1.3.0
  • [x] Apache Kafka version. confluentinc/cp-kafka:5.0.1
  • [x] Operating system. Linux
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [x] Critical issue.
enhancement

Most helpful comment

that's the broker killing the client connection because it's been inactive for 10 minutes. the client automatically reconnects if required.

All 4 comments

generally, the producer should recover from all errors, except where marked fatal.

marking as enhancement as we need to write more about error handling.

I get very often

Disconnected (after 5990635ms in state UP)

what about this error?

that's the broker killing the client connection because it's been inactive for 10 minutes. the client automatically reconnects if required.

Was this page helpful?
0 / 5 - 0 ratings