Confluent-kafka-dotnet: Bulk Insertion API in confluent-kafka

Created on 17 Apr 2019  Â·  15Comments  Â·  Source: confluentinc/confluent-kafka-dotnet

Does confluent-kafka-dotnet currently support bulk producer?

question

Most helpful comment

A batch produce API would wrongfully suggest a level of atomicity that does not reflect the actual message-based delivery performed by the client, which leads to bad assumptions by the user and later breakage.

What you probably want is transaction support, allowing you to commit or abort a set of messages as a whole. This is on the roadmap but no firm date yet.

All 15 comments

the client abstracts away batching of messages (LingerMs is a key configuration property). we don't have any plans to expose a batching API - you could write a simple wrapper yourself if you need that abstraction.

Hi @mhowlett - we have a similar need. Below are some extension methods I wrote to provide batch operations functionality. If you have a moment to glance over it I'd love to hear any feedback you have about potential pitfalls in my approach. I feel especially uncertain over my handling of delivery reports in the bulk producer:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Confluent.Kafka;

namespace Utilities.Kafka
{
    public static class KafkaBatchingExtensions
    {
        // It is strongly recommended to only use this with consumers configured with `enable.auto.offset.store=false`
        // since some of the consumes in the batch may succeed prior to encountering an exception, without the caller
        // ever having seen the messages.
        public static IEnumerable<ConsumeResult<TKey, TVal>> ConsumeBatch<TKey, TVal>(this IConsumer<TKey, TVal> consumer,
            TimeSpan maxWaitTime, int maxBatchSize, CancellationTokenSource cts = null)
        {
            var waitBudgetRemaining = maxWaitTime;
            var deadline = DateTime.UtcNow + waitBudgetRemaining;
            var res = new List<ConsumeResult<TKey, TVal>>();
            var resSize = 0;

            while (waitBudgetRemaining > TimeSpan.Zero && DateTime.UtcNow < deadline && resSize < maxBatchSize)
            {
                cts?.Token.ThrowIfCancellationRequested();
                var msg = consumer.Consume(waitBudgetRemaining);

                if (msg != null && !msg.IsPartitionEOF)
                {
                    res.Add(msg);
                    resSize++;
                }

                waitBudgetRemaining = deadline - DateTime.UtcNow;
            }

            return res;
        }

        // This override just defaults the `flushTimeout` to 10 seconds
        public static void ProduceBatch<TKey, TVal>(this IProducer<TKey, TVal> producer, string topic,
            IEnumerable<Message<TKey, TVal>> messages, CancellationTokenSource cts = null)
        {
            producer.ProduceBatch(topic, messages, TimeSpan.FromSeconds(10), cts);
        }

        public static void ProduceBatch<TKey, TVal>(this IProducer<TKey, TVal> producer, string topic,
            IEnumerable<Message<TKey, TVal>> messages, TimeSpan flushTimeout, CancellationTokenSource cts = null)
        {
            var errorReports = new ConcurrentQueue<DeliveryReport<TKey, TVal>>();
            var reportsExpected = 0;
            var reportsReceived = 0;

            void DeliveryHandler(DeliveryReport<TKey, TVal> report)
            {
                Interlocked.Increment(ref reportsReceived);

                if (report.Error.IsError)
                {
                    errorReports.Enqueue(report);
                }
            }

            foreach (var message in messages)
            {
                producer.Produce(topic, message, DeliveryHandler);
                reportsExpected++;
            }

            var deadline = DateTime.UtcNow + flushTimeout;
            const int flushWaitMs = 100;

            while (DateTime.UtcNow < deadline && reportsReceived < reportsExpected)
            {
                cts?.Token.ThrowIfCancellationRequested();
                producer.Flush(TimeSpan.FromMilliseconds(flushWaitMs));
            }

            if (!errorReports.IsEmpty)
            {
                throw new AggregateException($"{errorReports.Count} Kafka produce(s) failed. Up to 10 inner exceptions follow.",
                    errorReports.Take(10).Select(i => new KafkaProduceException(
                        $"A Kafka produce error occurred. Topic: {topic}, Message key: {i.Message.Key}, Code: {i.Error.Code}, Reason: " +
                        $"{i.Error.Reason}, IsBroker: {i.Error.IsBrokerError}, IsLocal: {i.Error.IsLocalError}, IsFatal: {i.Error.IsFatal}"
                    ))
                );
            }

            if (reportsReceived < reportsExpected)
            {
                var msg = $"Kafka producer flush did not complete within the timeout; only received {reportsReceived} " +
                          $"delivery reports out of {reportsExpected} expected.";
                throw new KafkaProduceException(msg);
            }
        }
    }
}

Hi @mhowlett - Currently we have similar need as @woodlee.

We have an existing program running which processes messages in batches. We require a batch to be fully processed in order to start with a new one. This includes writing to Kafka (which we want to integrate in this program).

As I do understand there is no plan for exposing a batching API. Would you accept a PR which introduces such feature?

Irregardless of accepting a 'batching API'-PR - Could you provide any feedback as requested by @woodlee or suggest a preferred method of doing this using kafka-dotnet?

A batch produce API would wrongfully suggest a level of atomicity that does not reflect the actual message-based delivery performed by the client, which leads to bad assumptions by the user and later breakage.

What you probably want is transaction support, allowing you to commit or abort a set of messages as a whole. This is on the roadmap but no firm date yet.

something like Task.WhenAll(batch.Select(msg => p.ProduceAsync(topic, msg))); will give you a task that completes when all messages in a batch have been sent. You can guarantee ordering of those by enabling idempotence. the task exception will be set if there's a problem. you can await the task.

the only value I see in a batching api is that it would cut down on the number of p/invokes, which may give a measurable perf improvement in some scenarios. but the client isn't slow (in some scenarios it beats the java client). but as @edenhill says, I think providing a batch API would do more to mislead than add value.

Thanks for the feedback @mhowlett! I had it in my head somehow that calls to ProduceAsync would lead to one round-trip to Kafka per message, which is why I went nuts with the code above to avoid it. I see now that I misunderstood.

Anyone else coming along: you should definitely do what @mhowlett recommends just above (i.e. using Task.WhenAll...) rather than using my ProduceBatch method (which is _way_ overcomplicated). I tested his approach in my code and it seems to work fine. I suppose one might get undue CPU pressure from task-switching for very large batches, but in testing with batches of 10K simple string messages, it didn't seem too burdensome.

My ConsumeBatch method, on the other hand, has been working great for us in production use for a while now. I did modify it a bit to take a CancellationToken instead of a CancellationTokenSource but otherwise it remains as written above. Definitely open to any ideas or criticisms about it though!

Thanks @edenhill, @mhowlett and @woodlee.

Based on your feedback I can see why a batching API is bad and may lead to bad assumptions. I assume that I went on the same idea as @woodlee by thinking that one produceAsync would cause on round-trip to Kafka per message*.

For now we do not need transactions or a strong guarantee that messages arrive in the correct order. I'll start looking into @mhowlett's example to see if this meets our requirements.

_*: During my tests I noticed a high latency in order to sent all messages to Kafka. I thought this was due to a incorrect way of batching my messages. Later I found out that my configuration was to blame._

@edenhill right now the ProduceAsync API doesn't reflect the way messages are sent to broker under the hood.
@mhowlett suggestion is fine, but I doubt it's gonna be as efficient as a RecordSet with many messages if you take into account entire data pipeline (producer -> broker -> consumer)

librdkafka - which exposes a message-by-message API - outperforms the java client across the board. The .NET client is constrained by the API choices in librdkafka. There are some additional inefficiencies in .NET (in particular, Tasks do add a lot of overhead, if you use that variant to produce) but it's on the whole very fast and out-performs the java client in some parts of the parameter space. It would be possible to get higher performance by exposing the MessageSet abstraction to the user, but this would also expose the user to lower level concerns. Also keep in mind that in most practical scenarios, the performance of the client is not the bottleneck - it's either network (a single client can easily saturate most network interfaces), or something related to the application (often CPU).

@mhowlett so the constraint is at the librdkafka's level? I [read the docs] for 0.11.6 which we use (https://github.com/edenhill/librdkafka/blob/v0.11.6/INTRODUCTION.md) and it seems the support for batching is present (section High throughput):
... test [0]: MessageSet with 1514 message(s) delivered

yes, work would need to be done in librdkafka to expose batching at the API level, but we haven't been convinced of the value in doing that (in fact we believe it may be dangerous as it may promote less than ideal usage).

the section you refer to in the docs refers to batching in protocol requests. librdkafka abstracts this away from you, except for the properties (most commonly linger.ms) that control it's behavior.

@mhowlett sorry, I haven't stated my problem clearly, I'm afraid.
I'd like to achieve batching at protocol level, but so far unable to do so with 0.11.6 client.
tried several values of linger.ms - every time the same outcome:

7|2019-10-07 09:59:50.892|rdkafka#producer-1|PRODUCE| [thrd:my_kafka_hostname:9092/0]:my_kafka_hostname:9092/0: topic_name.test [0]: Produce MessageSet with 1 message(s) (1201 bytes, ApiVersion 3, MsgVersion 2) 7|2019-10-07 09:59:50.895|rdkafka#producer-1|MSGSET| [thrd:my_kafka_hostname:9092/0]: my_kafka_hostname:9092/0: topic_name.test [0]: MessageSet with 1 message(s) delivered

the client will definitely batch messages to the broker (all versions).

if you experience the same problem with the latest version, please provide full code to a program demonstrating the issue and we can comment further.

Will do

On Mon, Oct 7, 2019 at 9:24 PM Matt Howlett notifications@github.com
wrote:

the client will definitely batch messages to the broker (all versions).

if you experience the same problem with the latest version, please provide
full code to a program demonstrating the issue and we can comment further.

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/confluentinc/confluent-kafka-dotnet/issues/890?email_source=notifications&email_token=AABOO7DCQ4LNFHAVIFO6O3TQNN5EPA5CNFSM4HGR6MGKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEARKPJQ#issuecomment-539142054,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AABOO7EMDVMIHZ4DBML7523QNN5EPANCNFSM4HGR6MGA
.

--
Yours faithfully,
Gleb

@mhowlett I updated to 1.2.0 version and got the desired result using await Task.WhenAll(...) approach along with linger.ms and batch.num.messages settings explicitly set

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ietvijay picture ietvijay  Â·  3Comments

kvandake picture kvandake  Â·  3Comments

keggster101020 picture keggster101020  Â·  4Comments

maximecaron picture maximecaron  Â·  3Comments

farodin91 picture farodin91  Â·  3Comments