Confluent-kafka-dotnet: Consume multiple messages

Created on 16 Jan 2020  路  7Comments  路  Source: confluentinc/confluent-kafka-dotnet

Question,
Is it possible to read multiple messages/stream of bytes from kafka topic ?

Right know I can't find any information regardless consume bytes of array/ multiple messages at once

Since consuming each message individually takes a lot of time.

If such case is impossible, what's the best solution would be to consume a lot of data (50gb) each day

question

Most helpful comment

@mhowlett Any plans for adding ConsumeBatch method to IConsumer? If not then can you validate implementation provided below? It it based on the assumption that consumer.Consume(TimeSpan.Zero) will not call the broker instead only check if there is something on internal queue (which does not involve any IO bound operation) and return message from internal queue or null immediately. Is that assumption correct and if yes can it change it future resulting in breaking this code?

internal static class ConsumerExtensions
{
    public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TimeSpan consumeTimeout, int maxBatchSize)
    {
        var message = consumer.Consume(consumeTimeout);

        if (message?.Message is null)
            return Array.Empty<ConsumeResult<TKey, TValue>>();

        var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

        while (messageBatch.Count < maxBatchSize)
        {
            message = consumer.Consume(TimeSpan.Zero);
            if (message?.Message is null)
                break;

            messageBatch.Add(message);
        }

        return messageBatch;
    }
}

All 7 comments

The API provides you messages one at a time, but this is from an internal queue on the client, and behind the scenes there is a lot going on to ensure high throughput from the brokers. The client will very easily handle 50Gb/day (this is a small amount of data in Kafka terms).

You can look at creating a list of messages internally and process them after x seconds. We use a timer and trigger the processing of messages once the timer event is elapsed. Motivation for batching in our scenario is to perform DB operations in batch.

@mhowlett Any plans for adding ConsumeBatch method to IConsumer? If not then can you validate implementation provided below? It it based on the assumption that consumer.Consume(TimeSpan.Zero) will not call the broker instead only check if there is something on internal queue (which does not involve any IO bound operation) and return message from internal queue or null immediately. Is that assumption correct and if yes can it change it future resulting in breaking this code?

internal static class ConsumerExtensions
{
    public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TimeSpan consumeTimeout, int maxBatchSize)
    {
        var message = consumer.Consume(consumeTimeout);

        if (message?.Message is null)
            return Array.Empty<ConsumeResult<TKey, TValue>>();

        var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

        while (messageBatch.Count < maxBatchSize)
        {
            message = consumer.Consume(TimeSpan.Zero);
            if (message?.Message is null)
                break;

            messageBatch.Add(message);
        }

        return messageBatch;
    }
}

yep that will work (yes, consume reads from an internal queue, and broker fetch requests happen in background threads).

what is your use-case for requiring a batch of messages?

Batch consume requirement is not super common use-case in our system, but it appears in two places. First is the case when we would want to do also batch update on the database based on multiple messages rather than doing it message by message. Second is when we replicate topic from one Kafka cluster to second Kafka cluster in different AWS region. When replicating we would like to consume batch and produce batch as it seems to be most optimal performance wise.

Additional question for consumer.Consume(timeout). When timeout is greater than zero and we already have messages in the internal queue (filled by background thread) will it return immediately with whatever is already in the queue or it will use provided timeout to try gather more messages? I think I already know the answer but want to double check.

First is the case when we would want to do also batch update on the database based on multiple messages rather than doing it message by message.

makes sense

Second is when we replicate topic from one Kafka cluster to second Kafka cluster in different AWS region. When replicating we would like to consume batch and produce batch as it seems to be most optimal performance wise.

both the producer and consumer batch behind the scenes (and this behavior is configurable) - i don't think you gain anything from doing this yourself as well.

Additional question for consumer.Consume(timeout). When timeout is greater than zero and we already have messages in the internal queue (filled by background thread) will it return immediately with whatever is already in the queue or it will use provided timeout to try gather more messages?

It will return immediately. fetching of messages from the broker happens in background threads independently of calls to the consume method.

In this replication use-case we need to guarantee at least once delivery and unchanged ordering. We produce with Acks.All (min insync replicas 2), MaxInFlight 1 with high MessageTimeoutMs and MessageSendMaxRetries. We essentially can't produce next message until current one is confirmed to be committed by brocker. When treating it more like batches we could potentially at least parallelize that per partition as no one is guaranteeing ordering between partitions.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

vinodres picture vinodres  路  4Comments

zoeysaurusrex picture zoeysaurusrex  路  4Comments

alfhv picture alfhv  路  3Comments

keggster101020 picture keggster101020  路  4Comments

Duorman picture Duorman  路  3Comments