Confluent-kafka-dotnet: How do I implement a producer inside a web api project

Created on 29 Mar 2019  路  5Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I am trying to implement a producer inside a .NET framework WebAPI project.
I found one of the threads #360 where I found a solution.

public class KafkaClient : IKafkaClient
{
    private Producer<string, string> producer;

    public KafkaClient(IConfiguration globalconf)
    {
        var config = new Dictionary<string, object>
        {
            { "bootstrap.servers", "http://localhost:9092" }
        };

        producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
    }

    public async Task<Message<string, string>> Produce(string key, string val)
    {
        var resp = await producer.ProduceAsync("QuoteTest", key, val);
        return resp;
    }
}

Now when I call the produce method from the action method using
await producer.Produce("hello1", "world1")
It takes a long time and then returns a "Local: Message timed out" error. However this exact same piece of code works when I use it in a .NET Console app. I have tried implementing this in .NET Core 2.0 WebAPI project but I face the same issue.
Here is the response object from the Produce method
image

How to reproduce

Implement the above code snippet inside a .NET framework/Core 2.0 WebAPI project and call the method from an action.

Checklist

Please provide the following information:

  • [x] Confluent.Kafka nuget version: 0.116
  • [x] Apache Kafka version: 2.12-2.2.0
  • [x] Client configuration: .NET framework WebAPI project
  • [x] Operating system: Win 10
question

Most helpful comment

it would be a good feature for us to do a runtime check for this. note that kafka has a custom binary protocol, doesn't use http.

All 5 comments

Are you settings up your producer as a singleton in startup.cs ?

you can also use dependency injection in Startup.ConfigureServices. If you make your KafkaClient class above IDisposable and dispose the producer in the Dispose method, then you can add it as a service like so: services.AddSingleton<IKafkaClient, KafkaClient>(); and it'll automatically be disposed on application shutdown. Then, to use it in a controller, pass IKafkaClient as a parameter to the controller constructor, and set it as a member of the controller class in that. then you can use it from any request handler in the controller.

@mhowlett I had implemented it as you have said, a singleton class. The issue was happening due to a simple reason and that was passing "http://" as part of the host url. When i remove that and pass only "server_name:port_number", it works perfectly. The documentation should mention somewhere that you should not pass the "http://" as part of the hostname
You can mark this thread as closed.
Thanks

it would be a good feature for us to do a runtime check for this. note that kafka has a custom binary protocol, doesn't use http.

Thanks for the info!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Ravindranrajendran picture Ravindranrajendran  路  3Comments

jeffreycruzana picture jeffreycruzana  路  3Comments

keggster101020 picture keggster101020  路  4Comments

farodin91 picture farodin91  路  3Comments

MihaiComan87 picture MihaiComan87  路  3Comments