Confluent-kafka-dotnet: Consume Kafka Message Asynchronously

Created on 29 Mar 2018  路  28Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

Based on the example, we need to wrap the Consume method in a while loop. Is it possible to consume the message asynchronously?

Checklist

Please provide the following information:

  • [x] Confluent.Kafka nuget version: 0.11.3
  • [ ] Apache Kafka version:
  • [ ] Client configuration:
  • [x] Operating system: Windows
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue
question

Most helpful comment

@mhowlett would it make sense to re-open this issue since it was not yet implemented?

All 28 comments

you need to implement a poll loop yourself. you can of course encapsulate this up and set it running in a separate thread though if you like.

do you have an API in mind? feel free to jot down some pseudo code if you have a concept for how this should work. there is certainly scope for providing a consumer at a higher level of abstraction.

I might have misunderstood what you mean by 'asynchronously' above. The Consumer will provide a ConsumeAsync method in version 1.0.

Consumer's ConsumeAsync is still missing from the latest version (1.0.1.1). When will this be available?

it's not on the near-term roadmap, but we still plan to add it.

Any updates for regarding the ConsumeAsync method? Thanks

it's not on the near-term roadmap, but we still plan to add it.

We don't view it as important as closing the gap on features with java, for example adding transactions. If there's a reason you view this as critical, feel free to explain.

Hi, currently I'm developing a Consumer Web Api in Net Core 2.2 that is called via Depenncy Injection. But it seems like the library lacks of ConsumeAsync inr order to do that. I saw that you have shared ProduceAsync method as DI to use as API controller. I also saw your Consume running in loop making hard to consume message asynchronously. Any tips to implement Consume Web Api?

ok, sounds like a REST proxy type thing? if the consumer poll is driven by calls to the http handler method then yes, this will take a bit of effort to set up properly. suggest first doing a consume call with 0 timeout which won't block. if that doesn't return a message that's where it gets tricky. you could Task.Run a blocking Consume call and await that, but if you expect high load you'll want a better solution, which would be to have a single thread completing the tasks (similar to how the producer works). this is what we will implement, but we can do it better if we make some modifications to librdkafka, which is a bigger effort, which is why we haven't done it yet.

I have exactly similar requirements too for a ConsumeAsync.

I'm really, really need ConsumeAsync not to block the main Thread. 馃槄
When you hear about his implementation, please, give us a pass.

@mhowlett would it make sense to re-open this issue since it was not yet implemented?

@mhowlett Has there been any movement on ConsumeAsync?

it's not a near term priority explicitly, but we haven't forgotten about it.

Hi, as @rknidzam asked before, I am also getting trouble to find best way to implement Consumer application. In order to use models in API, I am planning to put consumer application as BackgroundService with adding it to hosted services. But I am not sure about the effects to API performance and also I don't know how can I scale up consumers horizontaly independent from API. I am using .Net Core and do you have any idea about best practice to implement Consumer in .Net Core with using microservice architecture ?

Hi, is there any update on this?

In terms of putting the Consume operation on a separate thread, its fairly easy to move it to a new thread using Task.Run(() => { while (true) { var c = _consumer.Consume(); ... } });

However, the real problem for me is that (assuming I understand correctly) the blocking version of Consume will hog a thread, regardless of whether that is the main thread or a worker thread created using Task.Run - if we therefore have a service that is subscribing to hundreds or thousands of topics, we risk starving the thread pool.

Having a ConsumeAsync function that supports the TPL (ie makes use of async/await) will allow threads to not be blocked and be reused, avoiding this issue. For me, this is the main reason a proper ConsumeAsync method is desired.

generally, you should be only using one consumer per process (you definitely shouldn't be creating thousands because they're very expensive, in fact, each consumer utilizes multiple threads behind the scenes). the impact of an additional thread that's not doing anything in the case the consume call is blocking will be insignificant. note that a single consumer is capable of delivering hundreds of thousands of messages to your application per second.

@mhowlett you have noted that a single consumer is capable of delivering hundreds of thousands of messages to your application per second. Yeah, you are right. But what can I do if my processing time of messages is more than 20 minutes. One way, I think increase consumer counts which is under one consumer group. But it's limited too, as you know, if we've 12 partitions, we can only run 12 consumers under one group ID. So what do suggest to me?

do your processing on different threads (the details of this are beyond the scope of what i can provide here). you may need to pause the consumer in order to allow you call consumer.consume frequently enough to avoid the max.poll.interval.ms timeout being hit.

Thanks for your answer. I've used below code
Parallel.ForEach(new List<ConsumeResult<Ignore, string>> { consumer.Consume(cts.Token) }, async consumerResult => { // Process Messages });

interesting idea. you won't get partition/thread affinity, but i don't immediately see a problem with it.

@kauanschumacher Try running the consumer in a background thread, it's a best practice anyway. (for instance in an aspnet app invoke it via HostedService etc..)

Can we please raise the priority on ConsumeAsync? This is simply the standard way of doing things in C# now and I am getting really tired of having to explain why we have to find some hack around the lack of this functionality every time.
What can we do to raise the priority on this? This has been open for more than 2 years now?

Agree with @HassanHashemi that HostedService is the best option to consume messages from Kafka. The only precaution is that the calling thread gets blocked until there are new messages to be delivered to consumer. This could be worked around by await Task.Yield().

You can find more details in my article about Kafka Consumer API

There is no such thing like a easy hack to wrap and transform the consumer() to asynchronous.

This is a great question which really isn't explained well in most texts about C# and async.

I searched for this for ages thinking I could and should maybe be implementing my own async I/O methods. If a method/library I was using didn't have async methods I thought I should somehow wrap these functions in code that made them asynchronous. It turns out that this isn't really feasible for most programmers. Yes, you can spawn a new thread using Thread.Start(() => {...}) or using Parallel.ForEach and that does make your code asynchronous, but it also creates a new thread which is an expensive overhead for asynchronous operations. It can certainly free up your UI thread to ensure your app stays responsive, but it doesn't create a truly async operation the way that ProduceAsync() is a truly asynchronous operation.

This is because async methods in the .net libraries use something called "standard P/Invoke asynchronous I/O system in .NET" to call low level OS code that doesn't require a dedicated CPU thread while doing outbound IO (networking or storage). It actually doesn't dedicate a thread to its work and signals the .net runtime when it's done doing its stuff.

I'm not familiar with the details but this knowledge is enough to free me from trying to implement async I/O and make me focus on using the async methods when available.

As @mhowlett commented above "each consumer utilizes multiple threads behind the scenes".

behind the scenes, Consume hits a librdkafka queue (does not wait on io). and since librdkafka doesn't use a thread pool / async framework - instead has a collection of long running threads for specific purposes - i think that implementing an async consume will cost another thread, however you do it.

the other complication here is ordering considerations. currently everything in the consumer api happens on a single thread, which is simple. as soon as we start allowing things to continue on different threads, we need to consider the implications of that. there may not be much to worry about, but it's something that needs to be thought through.

note that there's a relatively new example: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/RequestTimeConsumer.cs that demonstrates how to use the Consumer in a BackgroundService (which implements IHostedService).

Any updates for regarding the ConsumeAsync method? Thanks

It's likely we won't ever do this due to inherent limitations with the implementation.

However, I think there is a higher level (reactive) consumer API that we should build that is applicable to pretty much all use cases - the existing consumer API is lower level than most people need. We could/would make all methods on the higher level API async. So we would have 2 APIs - one that is high level and idiomatic, and a lower level one, which you wouldn't need to use unless you had an advanced use case, in which case it's less of an issue to require people understand how to integrate sync and async code well.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

farodin91 picture farodin91  路  3Comments

keggster101020 picture keggster101020  路  4Comments

vinodres picture vinodres  路  4Comments

zoeysaurusrex picture zoeysaurusrex  路  4Comments

michael-huxtable picture michael-huxtable  路  4Comments