Confluent-kafka-dotnet: How can i implement Kafka in microervices in .net core 2.0

Created on 6 Nov 2017  路  15Comments  路  Source: confluentinc/confluent-kafka-dotnet

I need to implement kafka in microservices in .net core 2.0 . All example are for console application.
I mean to say, i want every communication microservice to microservice should be through kafka. Can you provide a sample api in .net core to implement kafka.
I am stuck in last 6 days but not found any solution. Would it be possible ?
Please help @ah- and @mhowlett .

Thanks

question

Most helpful comment

@mhowlett thanks for the lines above. Can you please explain how you write your consumers in WebAPIs?
Each example has a while loop which seems very terrible. Do you have an example how to raise an event if a new message is received?

Thanks

All 15 comments

You will often implement micro services as a console application. The term microservice is kind of orthogonal to the .NET application type - it just means an application whose scope of work is relatively small.

@mhowlett So, i cannot integrate kafka in microservices? Microservices refers to web api.

ahh right. In the kafka world, we think about microservices a little bit differently. see e.g. https://www.confluent.io/blog/data-dichotomy-rethinking-the-way-we-treat-data-and-services/

yes, you can reference the Confluent.Kafka package just like in a console application. In a web api project, ideally you will create a wrapper class around your Producer or Consumer instance to make them available via dependency injection. Or, more simply you could just make them a static member of the Startup class.

It's a good suggestion to make an example of doing this, I'll put it on the todo list.

Thanks @mhowlett .

@mhowlett Any idea when would you provide the example of doing this in api ?

something along the lines of:

    public interface IKafkaClient
    {
        Task<Message<string, string>> Produce(string key, string val);
    }
public class KafkaClient : IKafkaClient
    {
        private Producer<string, string> producer;

        public KafkaClient(IConfiguration globalconf)
        {
            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", globalconf.GetValue<string>("Kafka:BootstrapServers") }
            };

            producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
        }

        public async Task<Message<string, string>> Produce(string key, string val)
        {
            return await producer.ProduceAsync("my-topic", key, val);
        }
    }
        public void ConfigureServices(IServiceCollection services)
        {
            ...
            services.AddSingleton<IKafkaClient, KafkaClient>();
        }
   [Route("api/[controller]")]
    public class ValuesController : Controller
    {
        private IKafkaClient producer;
        public ValuesController(IKafkaClient producer)
        {
            this.producer = producer;
        }
 ...

Thanks @mhowlett for providing the code snippet.

closing this, we're tracking the task internally.

@mhowlett thanks for the lines above. Can you please explain how you write your consumers in WebAPIs?
Each example has a while loop which seems very terrible. Do you have an example how to raise an event if a new message is received?

Thanks

@subprime You should execute the consumer in a background host, such as HostedService es in ASP.NET Core.

@subprime I have the same feeling about the while loop, could anyone help us with a code snippet of a ideal consumer? also can i define the amount of messages received per instance of my service? (ie: i want to proccess 10 messages per second on each instance)

first note that consuming gets complicated at scale because partitioning will affect your entire architecture (not just how you consume from kafka). assuming your app doesn't need to scale past a single web server (most won't), it's more straightforward. given the current design of the client, you should maintain a dedicated thread for a consumer poll loop over the lifetime of the web server. this will work great. don't use HostedService - you'd need ConsumeAsync for that. There are technical reasons why we haven't implemented that yet, though we do plan to at some point. It's not highest priority though because you can work around it, it just might not be as idiomatic as you might like. We're prioritizing features like transactions and fetch from follower etc. first.

@mhowlett Is this the workaround for the Consumer scenario in its simplest form?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
     await Task.Run(() =>
     {
         while(!stoppingToken.IsCancellationRequested){
                 var message = _consumer.Consume(stoppingToken);

         }         
     }, stoppingToken);
}

_For sure one would consume and process messages on different threads in order not to reduce consuming speed. But this is another topic..._

@barbarosalp - I didn't give it too much thought, but I don't see any problem with that approach.

Check this project, it should fit aspnetcore microservice

https://github.com/halamah/aspnetcorekafka

Was this page helpful?
0 / 5 - 0 ratings