Confluent-kafka-dotnet: Question: Multiple consumer instances

Created on 25 May 2017  路  17Comments  路  Source: confluentinc/confluent-kafka-dotnet

Hi,

When consuming from multiple topics inside the one application, or consuming from topics that have multiple partitions, are there any recommended ways of using the Consumer class to ensure maximum throughput of consumption?

The way we've approached this is to create a new instance of the Consumer for each topic/partition to be consumed from, and then to start them all polling and consuming, are there any issues with this approach?

Are there any other ways of optimising this?

Thanks in advance :)

question

Most helpful comment

To maximize throughput, it's better to have a single consumer instance by process. It's more difficult to achieve than producer because of the subscriptions strategy.

You should have a single consumer which subscribe to all wanted topics and poll in a dedicated thread. Then, when you receive the message, direct the message to workers tasks/thread
With multiple consumer, you duplicate connections to broker, add some overhead with multiple TCP requests and prevent the broker to batch all the data it can

All 17 comments

To maximize throughput, it's better to have a single consumer instance by process. It's more difficult to achieve than producer because of the subscriptions strategy.

You should have a single consumer which subscribe to all wanted topics and poll in a dedicated thread. Then, when you receive the message, direct the message to workers tasks/thread
With multiple consumer, you duplicate connections to broker, add some overhead with multiple TCP requests and prevent the broker to batch all the data it can

If a topic has multiple partitions, then for higher throughput, should we have multiple consumers for optimization reasons?

Having mutiple consumers to dispatch partitions is useful if you have multiple machines, with one consumer by machine. It has no benefit to do this in a single process - you will just lower the throughput.
For thoughput, you may have to tweak some tcp options like disable nagle algorithm

Thanks very much, we'll change to using a single consumer and see how that goes. Appreciate the help.

I have a Topic with 12 partitions. I have disabled auto commit to make sure I only commit once the message is processed successfully. For me both reliability and throughput are important. Even in this case

A. Having a single consumer instance is a better option?

or

B. Should I plan to have 12 consumer instances with same consumer group? Each consumer instance running on a separate thread in the same process. That way a delay in message processing on one consumer instance is not blocking rest of the messages.

A. Have a single consumer and 12 worker thread/task. A single thread poll in background. When you receive a message, you redirect it to the corresponding working thread/Task (you push it in a dedicated queue for example, that an other thread will consume) each worker then work independently and won't block the other one or the main thread

Also note that you will have to enable some conf to have good throughput on windows (you can find some relevant topic on this on librdkafka)

Thank you @treziac, I will try the single consumer option

I am using .NET reactive to notify subscriber. Do you have an example where a main thread polls messages from kafka and sends messages to workers queue that is drained by multiple threads?

Hi @treziac, we've started using a single consumer to poll and hand off to queues, what I'm noticing is that when calling Poll() on the consumer multiple times, all messages from the first partition are returned, then messages from the next partition, then the next etc.

E.g. If I have 100 partitions assigned to the consumer, then i'm receiving all messages from partition 0, then all messages from partition 1 and so on.

Is this expected?

Thanks.

message fetches from the broker are batched for efficiency, and I expect you're just seeing an artifact of that. If you had more messages, you would see that not all messages from a particular partition are retrieved before moving to another. If you play with the fetch.message.max.bytes config parameter you could reduce the batch sizes (but you probably don't want/need to).

Kafka guarantees messages ordering per partition - it has nothing to say about relative ordering between partitions.

Thanks @mhowlett that makes sense, and yes over time I am seeing a gradual inclusion of messages from other partitions, I noticed the behaviour most using the advanced consumer sample project to consume from a topic, which was mostly why I wondered if it was expected behaviour. I'll close this issue off now, thanks for all your help.

@vinodres: I have the same kind of use case where I have to create a single consumer instance working on multiple threads/tasks. I am also currently doing multiple consumer instances with multiple tasks. Were you be able to achieve a single instance but multiple tasks? Will you provide some code snippets for reference?

I'm trying to implement consumer, but with new version of Confluent.Kafka package (1.3.0).
May anyone can suggest, how to implement Consume if the new version doesn't have OnMessage and Pool?

public IObservable<Message<Null, string>> Consume(CancellationToken cancellationToken)
{
    var observable = Observable.FromEventPattern<Message<Null, string>>(
                            x =>
                                {
                                    _consumer.OnMessage += x;
                                    _consumer.Subscribe(_topics);
                                },
                            x =>
                                {
                                    _consumer.Unsubscribe();
                                    _consumer.OnMessage -= x;
                                })
                        .Select(x => x.EventArgs);

    Task.Factory.StartNew(
            () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        _consumer.Poll(TimeSpan.FromMilliseconds(100));
                    }
                },
            cancellationToken,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default)
        .ConfigureAwait(false);

    return observable;
}

Are there any example for this?

When you receive a message, you redirect it to the corresponding working thread/Task

I have subscription on 4 topics, how to process it in parallel?

we don't have an example of that yet

we don't have an example of that yet

How about now? Can we have a small example? 10x

I used something like this:

<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
public void Consume()
{
    // provides asynchronous message processing for different topics, but synchronous within each topic
    var pipeline = new Dictionary<string, ActionBlock<KafkaEventMessage>>();
    foreach (var subscription in _consumer.Subscription)
    {
        pipeline.Add(subscription, new ActionBlock<KafkaEventMessage>(async msg =>
        {
            await ProcessEvent(msg);
            _consumer.StoreOffset(msg.ConsumeResult);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
    }

    try
    {
        // start Poll Loop
        while (true)
        {
            KafkaEventMessage eventMsg = _consumer.Consume().ToEventMessage();
            pipeline[eventMsg.Topic].SendAsync(eventMsg).Wait();
        }
    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "Kafka consumer will be restarted due to non-retryable exception: {Message}", ex.Message);
        throw;
    }
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

Marusyk picture Marusyk  路  3Comments

jeffreycruzana picture jeffreycruzana  路  3Comments

zoeysaurusrex picture zoeysaurusrex  路  4Comments

MihaiComan87 picture MihaiComan87  路  3Comments

Eibwen picture Eibwen  路  3Comments