Confluent-kafka-dotnet: QueryWatermarkOffsets throws "Broker: Leader not available" if proceeding to fast

Created on 3 Mar 2017  路  14Comments  路  Source: confluentinc/confluent-kafka-dotnet

The following has been observed using Confluent.Kafka 0.9.4-preview8 agains Kafka_2.12-0.10.2.0 on Windows 10 using the following code:

var config = new Dictionary<string, object>()
{
   { "enable.auto.commit", false },
   { "group.id" , "Foo" },
   { "bootstrap.servers", clusterConnectionString }
};
using (var consumer = new Consumer(config))
{
  // The following line throws, but not if a breakpoint is placed. 
  var mark = consumer.QueryWatermarkOffsets(new TopicPartition("Bar", 0));
}

The line with QueryWatermarkOffsets throws the exception if the code runs uninterrupted. The topic "Bar" exist and has low-high watermarks 0-3. Placing a breakpoint at the line itself solves the problem. Assuming that this was due to an issue with the timing between some async initialization of consumer-metadata and the call to QueryWatermarkOffsets, I tried looping and catching the specific error with a small Thread.Sleep, hoping that it would eventually go away. The problem persisted after 10 1-seconds sleeps.

Perhaps some kind of internal retry or refresh of caches are needed?

MEDIUM enhancement

Most helpful comment

Shouldn't this rather be labeled as bug instead of enhancement? I'm facing the same issue. Using Confluent.Kafka 0.11 connecting to Kafka 0.9.0.1

All 14 comments

I am experiencing the same problem with the producers QueryWatermarkOffsets

Is "Bar" an existing topic?

Hi @HenningRoigaard - yes, I can replicate. I believe this is simply because the QueryWatermarkOffsets method is getting called before topic metadata has finished being retrieved from the broker. Note that this method wraps a method of similar name in librdkafka and turns any returned error code into an exception. In C, returning an error code in this scenario probably makes sense. At a higher level, an exception does seem a bit of an extreme reaction to this scenario. An ideal API would probably block a little longer at least I think.

For now, you can safely delay and / or handle this exception. I will talk to @edenhill more about whether we want to make changes related to the discussion above (and if so where - higher level clients or librdkafka).

I can't reproduce this with librdkafka.

Some questions for @HenningRoigaard

  • is "Bar" an existing topic?
  • how many brokers do you have in the cluster?
  • do you set all brokers in bootstrap.servers, or a sub-set?

This issue reliably happens for me under the following conditions: single broker. Bar is an existing topic. call is from a net451 app in a Windows 10 VM connecting to a kafka broker running on the host computer.

How long does it take for the call to fail?
Can you reproduce this with debug=metadata,protocol,topic and provide the logs?

ahh, good plan. what about log_level :-)

it fails immediately - does not block at all.

7|2017-03-05 19:42:34.356|rdkafka#consumer-1|METADATA| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
7|2017-03-05 19:42:34.407|rdkafka#consumer-1|CONNECTED| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Connected (#1)
7|2017-03-05 19:42:34.408|rdkafka#consumer-1|APIVERSION| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Using (configuration fallback) 0.9.0 protocol features
7|2017-03-05 19:42:35.272|rdkafka#consumer-1|METADATA| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Request metadata for all topics: connected

debug = all (just in case)

7|2017-03-05 19:43:47.028|rdkafka#consumer-1|MEMBERID| [thrd:app]: Group "Foo": updating member id "(not-set)" -> ""
7|2017-03-05 19:43:47.041|rdkafka#consumer-1|BROKER| [thrd:app]: 192.168.43.9:9092/bootstrap: Added new broker with NodeId -1
7|2017-03-05 19:43:47.049|rdkafka#consumer-1|BRKMAIN| [thrd::0/internal]: :0/internal: Enter main broker thread
7|2017-03-05 19:43:47.049|rdkafka#consumer-1|STATE| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
7|2017-03-05 19:43:47.049|rdkafka#consumer-1|BRKREASSIGN| [thrd:main]: Group "Foo" management reassigned from broker (none) to :0/internal
7|2017-03-05 19:43:47.053|rdkafka#consumer-1|CGRPSTATE| [thrd:main]: Group "Foo" changed state init -> wait-broker (v1, join-state init)
7|2017-03-05 19:43:47.053|rdkafka#consumer-1|BROADCAST| [thrd:main]: Broadcasting state change
7|2017-03-05 19:43:47.052|rdkafka#consumer-1|BROADCAST| [thrd::0/internal]: Broadcasting state change
7|2017-03-05 19:43:47.053|rdkafka#consumer-1|BRKASSIGN| [thrd:main]: Group "Foo" management assigned to broker :0/internal
7|2017-03-05 19:43:47.075|rdkafka#consumer-1|BRKMAIN| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Enter main broker thread
7|2017-03-05 19:43:47.078|rdkafka#consumer-1|CONNECT| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: broker in state INIT connecting
7|2017-03-05 19:43:47.052|rdkafka#consumer-1|METADATA| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
7|2017-03-05 19:43:47.082|rdkafka#consumer-1|CONNECT| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Connecting to ipv4#192.168.43.9:9092 (plaintext) with socket 1460
7|2017-03-05 19:43:47.088|rdkafka#consumer-1|STATE| [thrd:192.168.43.9:9092/bootstrap]: 192.168.43.9:9092/bootstrap: Broker changed state INIT -> CONNECT
7|2017-03-05 19:43:47.918|rdkafka#consumer-1|BROADCAST| [thrd:192.168.43.9:9092/bootstrap]: Broadcasting state change

@edenhill: I have a single-node cluster, "Bar" exists and boostraps.servers contains the single node address "localhost:9092". Everything works as expected when i debug and break at the Query-line. This I expect is due to a race condition somewhere in Confluent.Kafka/LibRdKafka.

@HenningRoigaard Why did you choose to close this issue?

Hmm, I must have closed it by mistake. I did not intend to. The issue has been reopened.

@mhowlett This is a great candidate for a small reproducible unit-test

Any updates on the above issue? I am facing the same exact problem.

Shouldn't this rather be labeled as bug instead of enhancement? I'm facing the same issue. Using Confluent.Kafka 0.11 connecting to Kafka 0.9.0.1

I'm experiencing the same issue. If I catch the exception & sleep for 10x 500ms it still fails, but if I sleep for 20x 500ms it returns with the expected data. As I'm trying to use this in a monitoring system of Kafka performance, introducing delays isn't really a good solution to the problem.

Was this page helpful?
0 / 5 - 0 ratings