Create the method
async Task<Message<TKey,TValue>> Consumer<TKey,TValue>.ConsumeAsync(TimeSpan timeout,CancellationToken ct)
with the following behavior:
This would allow both pull-based (IEnumerable<Message<TKey,TValue>>) and push-based (IObservable<Message<TKey,TValue>>) accessors to be built without individual developers having to reimplement a correct polling loop for every project.
As someone who has recently implemented both of these patterns, this method would have made my job significantly easier, and would still add peace of mind that the error handling is being done correctly.
This method should replace Consumer<TKey,TValue>.Consume, as it duplicates functionality, but is less flexible.
Thanks for this insight @BenjaminHolland - i expect we want try to do this.
In terms of priority, I think there are a few things we want to do on the Producer first.
In terms of priority, I think there are a few things we want to do on the Producer first.
Do you have an issue number? I've mostly been focusing on the consumer side of things, but I'd be interested in helping out with the producer as well.
after looking way down at Poll, it seems like you could get away with simply not removing Consume for the time being. It's not very idiomatic for C#, but it does provide almost all the required functionality that would be required for these patterns.
It seems that the real problem I've had with Poll is that there's no good way to associate a given poll request with a specific "completion" idiom other than very carefully synchronizing calls to Poll with subscriptions to OnError, OnConsumeError, and OnMessage. It's also not clear without digging into the source that Poll will trigger the events on the same thread as it's called on, which is important information for avoiding deadlocks. Consume implicitly associates its invocation with both the thread it's called on and the message and errors it produces, which is why it's a much better fit than poll in a lot of situations.
I think 2 shouldn't be hard to get in and there is a lot of demand for it. The main sticking point for 2 or 3 is going to be convincing me we've worked out the nicest API possible. If you have opinions on what the producer API should look like that would definitely be highly valued (given your comments above). Some old discussion on that is in #118.
If you opened a PR for ConsumeAsync, I'll review it. I haven't thought about this in depth, but suspect the constraint around guaranteeing order may get tricky.
oh, and thanks in advance for any further contributions :-)
Note regarding your edits: All events happen as a side effect of Consume or Poll methods, on that that thread. Consume and Poll almost do the same thing.
yes - this was for anyone else reading along :-)
I've implemented ConsumerAsync on a local branch and it's coming in version 1.0. thanks for your feedback. It will be included in the 1.0-experimental-6 nuget package. Poll/OnMessage/OnConsumeError are being removed - you're right they cause too much confusion.
Sweet. I'll check this out when I get home.
@mhowlett Hi, is this feature implemented? I could not find it in 1.0.0-RC1.
Any news on this? It doesn't seem to have been finally included into 1.0.0. Do you plan on adding this later on or did you just give up?
it will be added - the 1.0 API was designed in anticipation.
Notes:
ConsumeAsync API is not great for concurrent use because the order in which consume result Tasks are serviced will be undefined (they complete on thread pool threads). As a result, it will be easy for logic around committing offsets is to have subtle problems which many people are going to miss. Thank you for the fast reply! I see and I understand your points.
It would still be amazing if the consume could be non-blocking in some cases.
if you give it a timeout of 0 it will return the next message if available and null otherwise without blocking.
providing a timeout of 0 in the consume method is not fetching the messages, I have tried it but no luck. is there any other way to achieve this? or is it working for someone else?
Yeap. In our setup less than 300ms always returns null thus useless.
@Annamalai-Niyama, @pavel-agarkov - fetching messages from the cluster is essentially independent of whether you're calling Consume, which just reads them from an in-memory buffer. If you're not getting messages from Consume, it suggests something else is up. Feel free to provide demonstrating code / debug=all logs.
@mhowlett please find the demonstration code below
public class ConsumerWrapper
{
private string _topicName;
private ConsumerConfig _consumerConfig;
private Consumer<string,string> _consumer;
public ConsumerWrapper(ConsumerConfig config,string topicName)
{
_topicName = topicName;
_consumerConfig = config;
_consumer = new Consumer<string,string>(_consumerConfig);
_consumer.Subscribe(topicName);
}
public string ConsumeMessage(){
var consumedResult = _consumer.Consume(TimeSpan.Zero);
return (consumedResult.Value!=null)?consumedResult.Value:"No Message in the topic";
}
}
public class MessageService : BackgroundService
{
private readonly ConsumerConfig _consumerConfig;
private readonly ProducerConfig _producerConfig;
public MessageService(ConsumerConfig consumerConfig, ProducerConfig producerConfig)
{
_producerConfig = producerConfig;
_consumerConfig = consumerConfig;
}
protected override async Task ExecuteAsync(CancellationToken cancellaationToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var consumerWrapper = new ConsumerWrapper(consumerConfig, "dailydigest");
string consumedMessage = consumerWrapper.ReadMessage();
Console.WriteLine(consumedMessage);
}
}
}
I'm calling the ReadMessage method in a background service that runs every 2 seconds, in the above code, as the consume method is blocking the main thread the background service waits for the main thread, which doesn't serve the purpose.
As suggested tried using TimeSpan Zero in the consume method which returns the main thread, but the messages are not consumed.
Please let me know if any further details required.
your problem is that you're constructing a consumer and subscribing to a topic before every consume. constructing a consumer is very expensive - and there is quite a bit back-and-forth to the cluster going on before it can start consuming messages (so there is a significant delay to first message). you should only ever make one consumer instance and you should keep it alive over the lifetime of your application.
other notes:
because we don't have consumeAsync yet, rather than use HostedService, i would just set up a dedicated background thread (tied to app lifetime) and do a standard sync consume loop in that. this is completely fine, just not idiomatic C# (everything is async these days). it's actually more than completely fine - it will be measurably more performant than an async approach, because that comes with a fair bit of overhead (compared to the # msgs / s you can get out of the kafka consumer!)
alternatively you could fake an async consume method using task await Task.Run(() => cosumer.Consumer(timeout)). That has a lot more overhead than approach #1, but will allow you to use the standard hosted service pattern (you'll still get 100's of thousands of messages a second out of it). don't use the timer approach, use the async loop approach.
Any news on this? It doesn't seem to have been finally included into 1.5.0. Do you plan on adding this later on or did you just give up?
@mhowlett Can you help me? the consumer.Consume(cancelToken) does not ack to cancelToken, when canceToken fires
```c#
public class SimpleKafkaConsumerAdapter : IKafkaConsumerAdapter
{
private readonly ConsumerConfig _consumerConfig;
private readonly IConsumer
private readonly ILogger
public SimpleKafkaConsumerAdapter()
{
_logger = LoggerHelper.GetLogger
_consumerConfig = new ConsumerConfig(config)
{
GroupId = Shared.ProjectInfo.Name,
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumerConfig.EnableAutoCommit = false;
_consumer = new ConsumerBuilder
this._logger.LogInformation($"Initilized {GetType()}...................");
}
public void ConsumeOneByOne(string topic, Func
{
throw new NotImplementedException();
}
public void ConsumeOneByOne(string topic, Func<KafkaMessage, Task> func, CancellationToken cancellationToken)
{
_consumer.Subscribe(topic);
while (true)
{
if (cancellationToken.IsCancellationRequested) break;
var r = _consumer.Consume(cancellationToken);
if (cancellationToken.IsCancellationRequested) break;
try
{
_logger.LogDebug($"Consumer get new msg {topic}.....");
func(new KafkaMessage() { Key = r.Message.Key, Value = r.Message.Value, Topic = r.Topic });
_consumer.Commit();
}
catch (KafkaException e)
{
this._logger.LogError($"Consume failed, {e}");
}
}
}
public void StopConsume()
{
// _consumer.Unsubscribe();
_consumer.Close();
}
}
public class SendToDevBackgroundService : BackgroundService
{
public readonly SendToDev _sendToDev;
public readonly IKafkaConsumerAdapter _kafkaAdapter;
private readonly ILogger<SendToDevBackgroundService> _logger;
public SendToDevBackgroundService()
{
_sendToDev = new SendToDev();
_kafkaAdapter = new SimpleKafkaConsumerAdapter();
_logger = LoggerHelper.GetLogger<SendToDevBackgroundService>(nameof(SendToDevBackgroundService));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ExecuteAsync..................................");
_kafkaAdapter.ConsumeOneByOne("db_alert", _sendToDev.HandleAsync, stoppingToken);
await Task.CompletedTask;
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("StopAsync..................................");
_kafkaAdapter.StopConsume();
return base.StopAsync(cancellationToken);
}
}
```
you shouldn't try and fit the consumer into the BackgroundService pattern - none of the methods are async, it's not an appropriate pattern. just use a long running thread. this is pretty straightforward and has no real downside (but we need to do a cut 'n paste example with explanation...).
your ConsumeOneByOne method is blocking so you shouldn't be calling it in the ExecAsync method. Also you shouldn't be calling subscribe in this method, you typically do that once on consumer startup.
i'm not immediately sure why the Consume method doesn't cancel (possibly something relate how async/await works). I'd need to investigate (but would first write the background consume task as suggested above). there is an integration test covering cancellation token which passes.
thanks for your attention, when I change
```c#
_kafkaAdapter.ConsumeOneByOne("db_alert", _sendToDev.HandleAsync, stoppingToken);
await Task.CompletedTask;
```c#
await Task.Run( ()=>_kafkaAdapter.ConsumeOneByOne("db_alert", _sendToDev.HandleAsync, stoppingToken));
the cancelToken correctly triggered
@naughtyGitCat When you do that, you queue an IO bound operation to thread pool which blocks the thread pool thread until the operation is finished. This is not a good practice. Blocking in thread pool makes the thread pool create new threads for the blocking thread and that causes performance and memory issues. In ASP .NET Core, we should avoid creating thread pool threads because it can slow down ASP .NET's own threads for request processing.
In conclusion, we really need a truly asynchronous ConsumeAsync method.
@naughtyGitCat When you do that, you queue an IO bound operation to thread pool which blocks the thread pool thread until the operation is finished. This is not a good practice. Blocking in thread pool makes the thread pool create new threads for the blocking thread and that causes performance and memory issues. In ASP .NET Core, we should avoid creating thread pool threads because it can slow down ASP .NET's own threads for request processing.
In conclusion, we really need a truly asynchronous ConsumeAsync method.
Thanks, if I change
```c#
await Task.Run( ()=>_kafkaAdapter.ConsumeOneByOne("db_alert", _sendToDev.HandleAsync, stoppingToken));
```c#
await Task.FromResult( ()=>_kafkaAdapter.ConsumeOneByOne("db_alert", _sendToDev.HandleAsync, stoppingToken));
does it fix queue an IO bound operation to thread pool which blocks the thread pool thread until the operation is finished
@naughtyGitCat Yes, it does not block the thread pool. But it will block your own thread. It is simply wrapping around your result in a Task. If we look at the source code of Task.cs. Task.FromResult is equivalent to new Task(result). So that will create a Task object and wrap your result into it. So there will be no asynchronous operation.
@mhowlett Any updates on this?
A fully async/await enabled consumer is something we'd like, but not a near term priority because it's quite a bit of work, and doesn't enable any capability that's isn't possible with the current API.
Most helpful comment
your problem is that you're constructing a consumer and subscribing to a topic before every consume. constructing a consumer is very expensive - and there is quite a bit back-and-forth to the cluster going on before it can start consuming messages (so there is a significant delay to first message). you should only ever make one consumer instance and you should keep it alive over the lifetime of your application.
other notes:
because we don't have consumeAsync yet, rather than use
HostedService, i would just set up a dedicated background thread (tied to app lifetime) and do a standard sync consume loop in that. this is completely fine, just not idiomatic C# (everything is async these days). it's actually more than completely fine - it will be measurably more performant than an async approach, because that comes with a fair bit of overhead (compared to the # msgs / s you can get out of the kafka consumer!)alternatively you could fake an async consume method using task
await Task.Run(() => cosumer.Consumer(timeout)). That has a lot more overhead than approach #1, but will allow you to use the standard hosted service pattern (you'll still get 100's of thousands of messages a second out of it). don't use the timer approach, use the async loop approach.