Confluent-kafka-dotnet: ProduceAsync blocks indefinitely

Created on 5 Jun 2018  Â·  34Comments  Â·  Source: confluentinc/confluent-kafka-dotnet

Hi,
this is more like a question since I don't have invitation in Slack I realized this would be the best place.

I need to make one simple producer for our automation tests and make sure message is successfully pushed before I can continue with test execution, but sadly producer blocks for indefinite amount of time.

public async Task PublishPurchases(IEnumerable<BetPurchase> purchases)
{
      foreach (var purchase in purchases)
      {
          await Producer.ProduceAsync("incoming-purchases", purchase.GetHashCode().ToString(), purchase);
      }
}

What I noticed so far:

  • I am using burrow dashboard as monitor for kafka, where I see messages are successfully pushed, but call is still blocked
  • I noticed remarks comment on the Producer<>.ProduceAsync and subscribed for Producer<>.OnError event to check if the send queue is full, but no error was raised there.

This comes from remarks as well:

Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

Sadly I am not sure what it means exactly? Which Poll method needs to be called in another thread? On Consumer<>? If yes, why would producer be blocked until consumer polls? How can I enable background polling?

HIGH bug

Most helpful comment

IIRC 0.11.4 contains a 'fix' for a very related issue, so the difference in behavior from 0.11.2 is not completely surprising - it looks like I fixed one scenario and broke another.

I'll prioritize getting this fixed for 0.11.5.

All 34 comments

the producer has a background thread that polls for delivery reports - you can turn this off with a constructor argument, but generally you shouldn't.

what is your producer config?
how long did you wait? (how long is indefinite?).
does Producer.ProduceAsync("incoming-purchases", purchase.GetHashCode().ToString(), purchase).Result work? (i.e. a synchronous call).

Hi @mhowlett ,

  1. What is your producer config? - They are all default, I am only adding "bootstrap.servers" configuration.
  2. How long did you wait? - As far as I recall I gave up after 1 minute of waiting.
  3. Does Producer.ProduceAsync("incoming-purchases", purchase.GetHashCode().ToString(), purchase).Result work? - Yes it does, but I'd really want to omit .Result as it might cause deadlocks https://blog.stephencleary.com/2012/07/dont-block-on-async-code.html

yep, point 3 was not a suggestion, just trying to debug. it implies something is up with async code execution. can you provide more details of what you're doing (complete minimal code that demonstrates the problem, test framework? standalone app?) so that I can try and replicate. you might also like to try 1.0-experimental-3, but I can't think of anything in that that would address this problem.

I have same issue. I try to push data to Kafka, data was pushed correctly, no errors and produce methos is not completed anytime.

public async Task ProduceToKafka<TKey, TValue>(List<Tuple<TKey, TValue>> batch, TopicNamesEnum topicName)
        {
            using (var producer = new Producer<TKey, TValue>(_config, new AvroSerializer<TKey>(), new AvroSerializer<TValue>()))
            {
                producer.OnError += (_, e) =>
                {
                    _logger.Error(e);
                };

                var tasks = batch.Select(x => producer.ProduceAsync(topicName.ToString(), x.Item1, x.Item2)).ToList();;
                await Task.WhenAll(tasks);
                _logger.Trace($"{batch.Count} events are writted to Kafka. Topic name: {topicName}.");
            }
        }
[TestMethod]
        public async Task TestKafkaProducerAndConsumer()
        {
            const string testKey = "test_key";
            const string testSValue = "test_string_value";
            const int testIValue = 10;

            var kafka = new KafkaUtils("192.168.56.101:9092", "http://192.168.56.101:8081", 1000);

            await kafka.ProduceToKafka(new System.Collections.Generic.List<System.Tuple<TestKey, Test>> {
                new System.Tuple<TestKey, Test>
                (
                    new TestKey
                    {
                        name = testKey
                    },
                    new Test
                    {
                        ival = testIValue,
                        sval = testSValue
                    }
                )
            }, TopicNamesEnum.test10);

Config is default. I set bootstrap servers and schema registry only.

I think that it is related with this issue https://github.com/confluentinc/confluent-kafka-dotnet/issues/406

Hi @mhowlett , I am sorry for the delay, reproducing is really easy to me, just take the example from the github page and start one local kafka with docker.

What I noticed that I haven't before:

  • First X times awaiting ProduceAsync succeeds and there is no problem and after certain amount of time it starts blocking.
  • I left console application with task blocked to see how much time it will block, it was something like 4-5 minutes maybe more.(not sure if related to confluent.net or some timeout from the CLR).

I will configure burrow and see how much messages before it starts blocking if you want. This behavior really looks like the documented one about send queue, but as far as I read when queue is full it will raise error with Producer<>.OnError but yet no event from there was raised so I don't know what to look for.

I'm facing the same issue, code is very similar to one that @marpit has. I do have delivery report enabled. The message is sent, but the task returned from ProduceAsync never completes. Interesting that this only happens in tests (target .NET Framework 4.7), but not in the real application (targets .NET Core 2.0) or console sample (targets .NET Core 2.0 or .NET Framework 4.7).

I've also found several old issues which sound the same: https://github.com/confluentinc/confluent-kafka-dotnet/issues/92, https://github.com/confluentinc/confluent-kafka-dotnet/issues/190 and https://github.com/confluentinc/confluent-kafka-dotnet/issues/350, but it seems they were not completely hunted down.

Confluent.Kafka version: 0.11.4 (0.11.0 and 0.11.2 works fine btw!)

Producer config (all other properties are default):
bootstrap.servers = ...
acks = 1
retries = 3
batch.num.messages = 50
queue.buffering.max.ms = 10
socket.blocking.max.ms = 10
log.connection.close = false
socket.nagle.disable = true
message.timeout.ms = 5 minutes
message.send.max.retries = 2

@andreycha I agree, problem is in test only and console app works correctly.

IIRC 0.11.4 contains a 'fix' for a very related issue, so the difference in behavior from 0.11.2 is not completely surprising - it looks like I fixed one scenario and broke another.

I'll prioritize getting this fixed for 0.11.5.

I just quickly tried to replicate on .net core 2.1, osx, Xunit and didn't see an issue (will investigate further later). One thing you could try @marpit (and let me know if it makes any difference!) is inserting a producer.Flush(TimeSpan.FromSeconds(10)) before the end of the producer using block. perhaps the producer is being disposed immediately (and hence the task will never complete). I don't see why this would be happening, but it's not a bad theory.

@mhowlett I tried Flush with same result.

@mhowlett ProduceAsync hangs immediately after call. Regarding my test setup -- .NET Framework 4.7, Win 10 x64, NUnit 3.8.1. Issue could also be test framework dependent... @marpit what setup do you have?

@mhowlett I can also confirm that I am using NUnit as well, could it be issue with NUnit in specific?

Any thoughts from @CharliePoole maybe? Somehow related to https://github.com/nunit/nunit/issues/2395?

I just made a simple async test. NUnit 3.10.1, Confluent.Kafka 0.11.4, .NET Framework 4.6.1, Win10 x64 - ProduceAsync didn't hang. Maybe provide a complete minimal example (zipped up) that I can run in order to try and reproduce.

   [TestFixture]
    public class MyTests
    {
        [Test]
        public async Task Test()
        {
            var config = new Dictionary<string, object> { {
"bootstrap.servers", "10.200.7.144:9092" } };
            using (var p = new Producer<Null, Null>(config, null, null))
            {
                var dr = await p.ProduceAsync("tt5", null, null);
                Assert.False(dr.Error.HasError);
            }
        }
    }

We just recently spent a-lot of time working with our multi-target (NETStandard2.0 + FW451 on Win) wrapping client (as in it wraps this library) to be deadlock-free for all configurations (see above) that we support. This included producing, consuming and disposing.

That longish intro was only meant to say that we had to work through a bunch of different scenarios several different ways, I can't be sure we got everything, but here are a few things I'd suggest based on that experience (and limiting it generally to producing):

  • Do not mix TPL and async/await in your code
  • ConfigureAwait(false) everything unless you absolutely can't
  • If possible, determine if the issue is confined to core/linux-or-osx or fw/win and worry about core/win later (if it's fw/linux-or-osx, definitely try the other options)
  • Strip it down - comment-out those OnError handlers and see if the issue disappears (before you ConfigureAwait everything then, if it does, ConfigureAwait and add it back in and see if it stays gone), and maybe try not awaiting inside the loop but instead do a single Task.WhenAll (or do you need them to be produced strictly ordered?)

These are the things I'd start with for a producing issue like this. Hope that helps someone....

@mhowlett well, your example hangs for me. NUnit 3.10.1, Confluent.Kafka 0.11.4, .NET Framework 4.7, Win10 x64. I've also noticed that after about 10 seconds of hanging, I start to get lots of following events in the debug:

Exception caught: 'System.IO.FileLoadException' in Confluent.Kafka.dll ("Could not load file or assembly 'System.Runtime.CompilerServices.Unsafe, Version=4.0.3.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)") Exception caught: 'System.IO.FileLoadException' in Confluent.Kafka.dll ("Could not load file or assembly 'System.Runtime.CompilerServices.Unsafe, Version=4.0.3.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)") Hyperlink: Activate Historical Debugging 22.31s [25496] Worker Thread

What kind of logs or additional information can I provide?

@kuskmen, @marpit could you please reproduce given example and let us know how it goes for you?

Figured I'd add my two cents.

I can consistently reproduce this behavior by executing the following script in F# Interactive (v10.1.0 for F# 4.1, the one that ships with Visual Studio 2017 15.7.4) on Windows 10 build 17134 with .NET Framework 4.7.2 (HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\NET Framework Setup\NDP\v4\Full\Release is 461808, docs) using Confluent.Kafka 0.11.4

#r "Confluent.Kafka"

open Confluent.Kafka

let broker = "..."
let topic = "test-topic"

let p =
    let config =
        [ "max.in.flight.requests.per.connection", box 1
          "request.required.acks", box -1
          "produce.offset.report", box true
          "bootstrap.servers", box broker
          "debug", box "broker,topic,msg" ]
        |> Map.ofList
    new Producer(config, false, false)

p.OnLog |> Event.add (fun l -> printfn "%s" l.Message)
p.OnError |> Event.add (fun e -> printfn "%s" e.Reason)

printfn "Start"

let conv (s: string) = System.Text.Encoding.UTF8.GetBytes s |> System.ArraySegment
let k = conv "test"
let v = conv "ing"

let t = p.ProduceAsync (topic, k.Array, k.Offset, k.Count, v.Array, v.Offset, v.Count, true)
let m = t.Result // <------ Never completes!

printfn "Done, produced to partition %i" m.Partition

The curious part is that I see the following lines appear in the logs and I have confirmed through other means that the message does indeed get published to the topic:

[thrd:10.135.1.19:9092/1]: 10.135.1.19:9092/1: test-topic [0]: Produce MessageSet with 1 message(s) (75 bytes, ApiVersion 3, MsgVersion 2)
[thrd:10.135.1.19:9092/1]: 10.135.1.19:9092/1: test-topic [0]: MessageSet with 1 message(s) delivered

Jumping into the debugger shows the Task returned by ProduceAsync stuck in the WaitingForActivation state.

Confluent.Kafka 0.11.3 behaves as expected (the Task completes and returns a Message).

Edit: fixed typo in code snippet above.

I'm running on about 90 minutes of sleep, so forgive me if I'm off here,
but...

  • that should read "let m = t.Result", right?
  • even though it seems like it should be fine (because it's a console
    app and they're generally immune from thread context deadlocks), in this
    scenario:

    • a produce request is being placed on an internal queue for batching/

      actual production to the broker

    • an internal thread/task is waiting for a broker (the leader for the

      partition which.... I won't go into it. I'm sure you know this stuff and

      the relevant part is just the client-side task/thread)

    • a continuation task is created in WaitingForActivation state

      representing the to-be delivery report, and returned to you

    • the app accesses the value of the continuation with a blocking

      "Result" call

That should all be fine (but only because it looks like it's running on
the main thread of a console app), but any time I see something blocking on
async code (or mixing "tpl" with "async/await" which I know is a terrible
description) and there's a deadlock type issue (check, and check, in this
case), the first things I'd suggest would be (in no particular order):

  • ConfigureAwait(false) on the "async" thing, or...
  • Don't block on the async code (go full-on async - which can be a
    little annoying with a console app, but...), or..
  • Both, or...

Anyway - I have a reasonably good feeling that one of the first two above
will at least lead in the right direction (and I'd be curious to know as we
use this library a-lot)...

On Mon, Jun 25, 2018 at 10:07 AM Chris Pinola notifications@github.com
wrote:

Figured I'd add my two cents.

I can consistently reproduce this behavior by executing the following
script in F# Interactive (v10.1.0 for F# 4.1, the one that ships with
Visual Studio 2017 15.7.4) on Windows 10 build 17134 with .NET Framework
4.7.2 (HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\NET Framework
Setup\NDP\v4\Full\Release is 461808, docs
https://docs.microsoft.com/en-us/dotnet/framework/migration-guide/how-to-determine-which-versions-are-installed#to-find-net-framework-versions-by-viewing-the-registry-net-framework-45-and-later)
using Confluent.Kafka 0.11.4

r "Confluent.Kafka"

open Confluent.Kafka

let broker = "..."
let topic = "test-topic"

let p =
let config =
[ "max.in.flight.requests.per.connection", box 1
"request.required.acks", box -1
"produce.offset.report", box true
"bootstrap.servers", box broker
"debug", box "broker,topic,msg" ]
|> Map.ofList
new Producer(config, false, false)

p.OnLog |> Event.add (fun l -> printfn "%s" l.Message)
p.OnError |> Event.add (fun e -> printfn "%s" e.Reason)

printfn "Start"

let conv (s: string) = System.Text.Encoding.UTF8.GetBytes s |> System.ArraySegment
let k = conv "test"
let v = conv "ing"

let t = p.ProduceAsync (topic, k.Array, k.Offset, k.Count, v.Array, v.Offset, v.Count, true)
let m = p.Result // <------ Never completes!

printfn "Done, produced to partition %i" m.Partition

The curious part is that I see the following lines appear in the logs and
I have confirmed through other means that the message does indeed get
published to the topic:

[thrd:10.135.1.19:9092/1]: 10.135.1.19:9092/1: test-topic [0]: Produce MessageSet with 1 message(s) (75 bytes, ApiVersion 3, MsgVersion 2)
[thrd:10.135.1.19:9092/1]: 10.135.1.19:9092/1: test-topic [0]: MessageSet with 1 message(s) delivered

Jumping into the debugger shows the Task returned by ProduceAsync stuck
in the WaitingForActivation state.

Confluent.Kafka 0.11.3 behaves as expected (the Task completes and returns
a Message).

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/confluentinc/confluent-kafka-dotnet/issues/545#issuecomment-400025267,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AC-vl4MjM8rQ17atnAAwMUGY4djZHdvNks5uARi2gaJpZM4Uaalq
.

@sfrooster ah, you're right. That was originally all one line in my script but I tried splitting it up for clarity. That's what I get for refactoring in the comment editor I suppose. :)

Additionally I failed to mention that I've already tried using ConfigureAwait(false) on that Task to no avail. Also this script was meant to serve as a minimum viable reproduction of the issue that I encountered in my full-on async application code.

@sfrooster please note that originally reported issue don't have the problem of mixing async and sync code.

I have some strange problem. I'm trying to produce asynchronously to several topics some data.
My config:
var config = new Dictionary<string, object> { {"batch.num.messages", 1500}, {"queue.buffering.max.ms", 2000}, {"bootstrap.servers", ip}, {"message.timeout.ms", 3000}, {"socket.timeout.ms", 3000} };

var tasks = x.Select(u => Task.Run(async () =>
    {
        Logger.Trace($"Task {u.topic} began");
        try
        {
            using (var producer =
                new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
            {
               var deliveryreport = await producer.ProduceAsync(u.topic, null, u.serialized);
               if (deliveryreport.Error.HasError)
               {
                   Logger.Error($"Error {deliveryreport.Error.Reason}");
                   throw new Exception(deliveryreport.Error.Reason);
               }
               else
               {
                   Logger.Trace($"Status: {deliveryreport.Error.Reason}, Code {deliveryreport.Error.Code}");
               }
            }
        }
        finally
        {
            Logger.Trace($"Task {u.topic} is finished");
        }
    })).ToList(); 

while (tasks.Count != 0)
    {
        Logger.Trace($"Starting to await");
        var task = await Task.WhenAny(tasks);

        Logger.Trace($"Await finsihed");
        if (task.Exception != null && task.Exception.InnerExceptions.Any())
        {
            Logger.Error($"Logging to file");
        }
        else
        {
            Logger.Trace("Success");
        }
        Logger.Trace($"End");
        tasks.Remove(task);
    }

I want to get an error if kafka broker is unavailable and log my message to file.
When I'm launching my code in debug it works well, but when I'm installing my program as a service any task becomes blocked infinitely. @mhowlett What's wrong?

There is a deadlock scenario that can happen during Dispose of the Producer / Consumer if the Dispose method is called on a thread that is also required by one of the registered events (OnLog, OnError, etc..). It seems likely this is the root cause of the problems in this thread. This has been resolved in the 1.0-experimental-8 nuget package. Would appreciate people trying this out and reporting back as to whether this issue is solved for them.

@mhowlett thanks a lot for your investigation.
Now, that's what I've discovered. If I have working brokers and established connect to the Kafka server i noticed several moments: degraded performance of async producing like in the example above (https://github.com/confluentinc/confluent-kafka-dotnet/issues/545#issuecomment-404439056). Moreover, if i have a lot of async tasks of sending data I have to delete "message.timeout.ms" parameter from cofig because 3000 ms value is not enough now (10000 also is not enough). I get "Local: Message timed out" error. So for avoiding this error I should delete this value from config, but when I have disabled Kafka server I need this parameter to be set for catching error of unavailable server asap. And yes, now "await producer.ProduceAsync" is NOT blocked infinitely, but there are still some performance problems described above.

@massarakh - it's very costly to construct a Producer - tcp connections are established to all brokers and there various metadata and api feature request calls that occur before the producer is able to perform the actual produce request. You are creating a Producer instance for each message you send - this is extremely inefficient.

@mhowlett yes, this approach was only for debugging, just cause via singleton it also had locks during producing. Now I'll try to produce using singleton again.

@mhowlett Tried to produce asynchronously not in debugging, but after install and i noticed that ProduceAsync was blocked infinitely likely as before, settings in config didn't help.

hi mhowlett ,

We used a Web Api as Producer.

This API works fine in Debug mode, but after build and deployment on our integration environment ( Windows server / IIS), this api doesn’t produce any messages or logs.

We are in the same case as described by massarakh .

We use Confluent.Kafka (1.0.0-experimental-8)

@mhowlett I've retried it using the sample from https://github.com/confluentinc/confluent-kafka-dotnet/issues/545#issuecomment-399607692 and it worked. Setup was the same, but with the latest 1.0.0-experimental-8 package. It also works with latest stable 0.11.5 (before that I was trying it with 0.11.4). So I can say that producing messages in NUnit tests is fixed.

Hello
I'm experiencing the same issue:
confluent-kafka 0.11.5 (i tried experimental-8 as well)
Windows 10 x64, Console App .Net 4.5
The task stays on WaitingForActivation status.
The Kafka server is already in use, it means that we're already able to produce and consume documents using python and nodejs clients.
I also tried with a local kafka server, same issues. Here is the code:

        static void Main(string[] args)
        {
            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", "localhost:9092" }
            };

            Console.WriteLine($"Connecting to {config["bootstrap.servers"]}");
            var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8));
            var dr = producer.ProduceAsync("test_kafka", null, "some random value").Result;
            Console.WriteLine($"Delivered '{dr.Value}' to: {dr.TopicPartitionOffset}");
            Console.ReadLine();            
        }

@mathrb this is a different issue, it's blocking on the Task (not Dispose). Most likely the producer can't deliver the message for some reason, possibly you've specified the bootstrap.servers incorrectly. It will time out eventually.

I think the bootstrap.servers is correct, using this python code works :

import time
import uuid
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(100):
    generated_string = uuid.uuid4()
    print("Sending {}".format(generated_string))
    producer.send('test_kafka', str(generated_string).encode(encoding='utf-8'))
    time.sleep(1)

Any idea to find the reason it can't deliver the message?

EDIT: I just subscribed to errors, when using localhost or local IP, the kafka client fails to resolve a strange hostname: machinename.localdomain:9092

EDIT 2: I've just tried with older versions, it works with 0.11.0, 0.11.1, 0.11.3. I experienced the same issues with 0.11.4 and 0.11.5 . I've done my tests on my remote server because localhost is wrongly resolved

@mathrb i have the same constatation as you.
0.11.3 work fine, but 0.11.5 and 1.0.0-experimental-8 produce the same issue.

This thread is getting long, and discussion in it refers to a number of different root causes. One of those definitely corresponded to a real issue with the library, not mis-configuration etc and is now resolved. Going forward:

  1. I've just uploaded 1.0-experimental-9 which includes the deadlock fix but implemented a bit differently to 1.0-experimental-8 in order to also address #577.

  2. I'm going to close this issue. If you are still experiencing this problem and using 1.0-experimental-9, please open another issue that includes a short, complete example that I can copy/paste run in order to try to replicate. If you are sure your issue is exactly the same as on someone else has opened, feel free to add to that issue, else open a new issue.

thanks for all your feedback!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

zoeysaurusrex picture zoeysaurusrex  Â·  4Comments

Eibwen picture Eibwen  Â·  3Comments

kvandake picture kvandake  Â·  3Comments

keggster101020 picture keggster101020  Â·  4Comments

nitinpi picture nitinpi  Â·  4Comments