Confluent-kafka-dotnet: Get length/count of out queue for producer.

Created on 26 Sep 2017  路  16Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

Really enjoying working with confluent-kafka-dotnet library, however for the life me, I can not work out how to get the length/count of out queue for a producer.

I could handle this myself by doing something like;

config.Add( "queue.buffering.max.messages", 1);

Then managing the messages to be sent using own queue, however I see very poor performance/throughput. I assume this is because it essentially negates, or at the least _slows down_ the batch sending of messages.

Any wisdom would be very appreciated.

MEDIUM enhancement librdkafka

Most helpful comment

A rough indication of the out queue length would be much preferred over having to maintain additional code outside of the client to track the length.

All 16 comments

librdkafka does not expose a good number here, and so we don't either. The best you can do at the moment is call Flush with a small timeout and take a look at the return value. What this value represents is well documented in the method docs, but it is not particularly useful (it is not equal to the number of messages left to send). I'm interested to understand more about your use case and if you really need this. And yes, doing anything to reduce batching will kill throughput..

@mhowlett I am using kafka as as a persistent buffer for log messages between various services and logstash (no persistent-queue support in logstash for TCP input). If there is an outage between the application doing the logging and the kafka broker, I don't want risk the application's memory being eaten up buffering messages for the sake of logging.

I could set queue.buffering.max.messages to something sensible such as 250,000. This would allow for _short_ outages and ensure not too much memory is used.

Next I want something watching the length of this queue so it can start making lots of noise if it starts to fill up; i.e. to allow our ops guys to differentiate between a potential issue (warning) and something they actually have go and do something about.

For now I have put my own queue in front of the producer which I can monitor and set queue.buffering.max.messages to 5000 which seems to have nice enough throughput. Perhaps I'm missing a trick?

I'd just use a counter that increments when you send a message and decrements when a delivery report is received (either via the callback or an awaited ProduceAsync request).

Yep, it would be useful if the library exposed some more meaningful counts reflecting internal state.

Why not just expose outq_len()?

This is not exposed (except as the return value of Flush, which is marked as unstable because of it) because I viewed it as depending too much on librdkafka internals. I think better to expose more meaningful numbers (even if these require O(n) computation, don't know, haven't looked at what would be involved). I chose to expose nothing at all over something that's not straightforward to interpret.

        ///     The current librdkafka out queue length. This should be interpreted
        ///     as a rough indication of the number of messages waiting to be sent
        ///     to or acknowledged by the broker. If zero, there are no outstanding
        ///     messages or callbacks. Specifically, the value is equal to the sum
        ///     of the number of produced messages for which a delivery report has
        ///     not yet been handled and a number which is less than or equal to the
        ///     number of pending delivery report callback events (as determined by
        ///     an internal librdkafka implementation detail).

I think outq_len()'s accuracy is sufficient for the use-case described by @matth0x1

A rough indication of the out queue length would be much preferred over having to maintain additional code outside of the client to track the length.

Noted. If we call the property something that sounds internal like InternalQueueLength or InternalOutQueueLength or LibrdkafkaOutQueueLength and have a plan for how this name fits with other related properties that may ultimately get added, then I'm happy to add this now (it's a trivial addition). Will try to sort this out in the short term.

I feel that you might be overly cautious on this issue, Matt.

Below is the corresponding librdkafka function and it has been around for many years with no controversy, I think we can be pretty safe with calling it just OutQueueLength without the Internal or Librdkafka prefix in the .NET client, making a note what that actually means.

/**
 * @brief Returns the current out queue length.
 *
 * The out queue contains messages waiting to be sent to, or acknowledged by,
 * the broker.
 *
 * An application should wait for this queue to reach zero before terminating
 * to make sure outstanding requests (such as offset commits) are fully
 * processed.
 *
 * @returns number of messages in the out queue.
 */
RD_EXPORT
int         rd_kafka_outq_len(rd_kafka_t *rk);

I place very high priority on not confusing users :-).

As we just discussed, it'll be a simple change to provide another method in librdkafka that doesn't expose librdkafka internals, and I think we should do that and not expose this number.

note to anyone reading along: rd_kafka_outq_len is an appropriate value to compare to 0 in implementing Flush, which is the reason it's defined the way it is.

We are also very interested in this feature.
Our producers have a very high throughput, and we need to track their state and performance all the time. Until now we had our own buffer set up before the producer, emitting statistics of TP and Queue Length every few seconds. but that indeed slows everything down.

Are there any plans to integrate this in a close release?

@mohoch1

but that indeed slows everything down.

Are you using OnStatistics? And do you see a slow-down when using it?

I don't see that the OnStatistics event provides any stats on the producer itself. It only describes the state of the cluster and topics.

Like the original question, I'm trying to get the current output queue length of the producer, and would also like to have a throughput metric.

Like @mhowlett suggested, a property with name and meaning of InternalOutQueueLength would be great.

OnStatistics will provide what you need, full spec here:
https://github.com/edenhill/librdkafka/wiki/Statistics

The msg_cnt value is the current number of messages currently handled by librdkafka:

  • messages in producer queue
  • messages in-flight to/from broker
  • message acks/delivery-reports not yet polled received by application

Great.
That would do it.

Thanks

re-classifying as enhancement as it would be good to expose the queue length as I describe above, and I believe this would be quite straightforward.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Duorman picture Duorman  路  3Comments

ThomasHjorslevFcn picture ThomasHjorslevFcn  路  3Comments

ietvijay picture ietvijay  路  3Comments

MihaiComan87 picture MihaiComan87  路  3Comments

mohoch1 picture mohoch1  路  3Comments