Confluent-kafka-dotnet: Unhandled Exception: Confluent.Kafka.KafkaException: Disconnected after ms in state up

Created on 4 Mar 2020  路  10Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

Sometimes I got error:

Unhandled Exception: Confluent.Kafka.KafkaException: 10.244.3.12:9092/1: Disconnected (after 1438771ms in state UP)
at Common.EventBus.KafkaProducer.<.ctor>b__4_1(IProducer2 _, Error errorHandler) at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Debian9.rd_kafka_poll(IntPtr rk, IntPtr timeout_ms) at Confluent.Kafka.Producer2.<>c__DisplayClass24_0.b__0()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)

ctor of my KafkaProducer class

public KafkaProducer(EventBusOptions options, ILogger<KafkaProducer> logger)
{
    _options = options ?? throw new ArgumentException(nameof(options));
    _logger = logger ?? throw new ArgumentNullException(nameof(logger));

    var producerConfig = new ProducerConfig
    { 
        BootstrapServers = options.ConnectionString, 
        EnableIdempotence = true,
        CompressionType = CompressionType.Snappy
    };

    _producer = new ProducerBuilder<string, string>(producerConfig)
        .SetLogHandler((_, logMessage) => _logger.LogInformation(logMessage.Message))
        .SetErrorHandler((_, errorHandler) =>
        {
            _logger.LogError(errorHandler.Reason);
            throw new KafkaException(errorHandler);
        })
        .Build();
}

How to reproduce

After sending message like

public async Task Send(IntegrationEvent eventData)
{
    var jsonMessage = JsonConvert.SerializeObject(eventData, new JsonSerializerSettings
    {
        ContractResolver = new CamelCasePropertyNamesContractResolver()
    });

    var message = new Message<string, string>
    {
        Value = jsonMessage,
        Headers = new Headers
        {
            {"eventType", Encoding.UTF8.GetBytes(eventData.EventType.ToLowerInvariant()) }
        }
    };

    _producer.Produce(topicName, message, report =>
    {
        if (report.Error.IsError)
        {
            _logger.LogError(report.Error.Reason);
        }
        else
        {
            _logger.LogInformation("Event message '{EventType}' with Id '{MessageId}' sent to topic '{TopicName}'", eventData.EventType, eventData.Id.ToString(), report.TopicPartitionOffset);
        }
    });
}

Log

Mar 4, 2020 @ 12:07:14.214 | Event message '"con_integration"' with Id '"c846365a-dcc7-4a48-b6f3-5116aeb72b96"' sent to topic '"dev_con_integration [[11]] @164"'
-- | --
Mar 4, 2020 @ 12:17:14.439 | 10.244.2.165:9092/0: Disconnected (after 992313ms in state UP)
-- | --

Checklist

  • Confluent.Kafka nuget version 1.3.0
  • Apache Kafka image: confluentinc/cp-kafka:5.0.1
  • Operating system: Linux
  • [x] Critical issue.
question

Most helpful comment

that error is not fatal.

All 10 comments

error notifications via SetErrorHandler are informational unless the IsFatal property is true (in which case you'll be notified via other exceptions anyway). you can just log them and only need to add logic to handle specific ones if you have specific need (unusual).

thanks for quick response
do you mean that this error is not Fatal and I can ignore it?

.SetErrorHandler((_, errorHandler) =>
        {
            _logger.LogError(errorHandler.Reason);
            // throw new KafkaException(errorHandler);
        })

Will a producer work after this error? I can't reproduce it, so it happens sometimes.

it's not fatal unless errorHandler.Fatal (that variable would better be called errorbtw) is true. Currently that can only happen in very rare cases when you're using idempotent producer.

if you do get a fatal error, every call to every method will also give you an exception. if you don't it's fine to keep using the producer. the name 'error' here is unfortunate and historical reasons - it's confusing, yes.

I can't check if it is Fatal because it is hard to reproduce. I thought, you know if Disconnected (after 992313ms in state UP) is fatal or not.

I create producer instance as a singleton

that error is not fatal.

so, I can send message after this error, right?

you should pretty much never get a fatal error, and everything will blow up if you do - the producer will be unusable.

thank you very much

Was this page helpful?
0 / 5 - 0 ratings

Related issues

zoeysaurusrex picture zoeysaurusrex  路  4Comments

Eibwen picture Eibwen  路  3Comments

mohoch1 picture mohoch1  路  3Comments

vinodres picture vinodres  路  4Comments

ThomasHjorslevFcn picture ThomasHjorslevFcn  路  3Comments