The code in the link below wass modified to send 50 messages of 100 characters. It takes around 50 seconds, which is about 200+ times slower than other brokers running on the same environment.
Basically, the code below is repeated to run 50 times
var deliveryReport = producer.ProduceAsync(topicName, "", messageWith100Characters);
var result = deliveryReport.Result; // synchronously waits for message to be produced.
Client: Confluent.Kafka nuget version: 0.11.4 runs on Windows 7
Server: RHEL 7+ via VirtualBox
Default settings from both Kafka and ZooKeeper are used
Please provide the following information:
this is extremely slow, even for synchronous producing (which is not recommended). I'm unsure why based on the information provided.
Enable "debug", "protocol" in your producer config and check for ProduceRequest latencies (rtt)
the 1s delay between send requests suggests you have either changed the settings away from the defaults (used in the example you cite) or you are somehow inserting a delay between produce requests. please provide a complete example demonstrating the problem, otherwise all we can do is speculate ...
I suspect the cause is somewhere in RunEach or MeasureTotalRuntime - but you need to provide a complete self contained example, otherwise I can't tell.
@mhowlett I provided the wrong code. I will provide the correct code, which is from this github project.
Below is the code modified based on AdvancedProducer from Confluent Kaflka code example.
using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
using Confluent.Kafka.Serialization;
namespace Confluent.Kafka.Examples.AdvancedProducer
{
public class Program
{
public static void Main(string[] args)
{
//if (args.Length != 2)
//{
// Console.WriteLine("Usage: .. brokerList topicName");
// return;
//}
string brokerList = "192.168.1.10:9092";// args[0];
string topicName = "DurableQueue";// args[1];
/*
// TODO(mhowlett): allow partitioner to be set.
var topicConfig = new TopicConfig
{
CustomPartitioner = (top, key, cnt) =>
{
var kt = (key != null) ? Encoding.UTF8.GetString(key, 0, key.Length) : "(null)";
int partition = (key?.Length ?? 0) % cnt;
bool available = top.PartitionAvailable(partition);
Console.WriteLine($"Partitioner topic: {top.Name} key: {kt} partition count: {cnt} -> {partition} {available}");
return partition;
}
};
*/
var config = new Dictionary<string, object> {
{ "bootstrap.servers", brokerList },
{ "debug", "protocol"}
};
using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("\n-----------------------------------------------------------------------");
Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine("To create a kafka message with UTF-8 encoded key/value message:");
Console.WriteLine("> key value<Enter>");
Console.WriteLine("To create a kafka message with empty key and UTF-8 encoded value:");
Console.WriteLine("> value<enter>");
Console.WriteLine("Ctrl-C to quit.\n");
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Console.Write("> ");
string text="";
try
{
// text = Console.ReadLine();
}
catch (IOException)
{
// IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
break;
}
if (text == null)
{
// Console returned null before
// the CancelKeyPress was treated
break;
}
var key = "";
var val = text;
// split line if both key and value specified.
int index = text.IndexOf(" ");
if (index != -1)
{
key = text.Substring(0, index);
val = text.Substring(index + 1);
}
Message<string,string> result=null;
const string MESSAGE_CONTENT = "123456789001234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901";
Console.WriteLine(DateTime.Now);
for (int i = 0; i < 10; i++)
{
var deliveryReport = producer.ProduceAsync(topicName, key, MESSAGE_CONTENT);
result = deliveryReport.Result; // synchronously waits for message to be produced.
}
Console.WriteLine(DateTime.Now);
//var deliveryReport = producer.ProduceAsync(topicName, key, val);
//var result = deliveryReport.Result; // synchronously waits for message to be produced.
Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}");
}
}
}
}
}
below is the debug output. It takes 11 seconds to send 10 messages of 100 characters each:
Producer rdkafka#producer-1 producing on topic DurableQueue.
-----------------------------------------------------------------------
To create a kafka message with UTF-8 encoded key/value message:
> key value<Enter>
To create a kafka message with empty key and UTF-8 encoded value:
> value<enter>
Ctrl-C to quit.
> 10/04/2018 00:57:15
7|2018-04-10 00:57:15.373|rdkafka#producer-1|CONNECTED| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Connected (#1)
7|2018-04-10 00:57:15.426|rdkafka#producer-1|FEATURE| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
7|2018-04-10 00:57:15.427|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
7|2018-04-10 00:57:15.437|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Received ApiVersionResponse (v0, 234 bytes, CorrId 1, rtt 0.00ms)
7|2018-04-10 00:57:15.438|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
7|2018-04-10 00:57:15.442|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/bootstrap: Received MetadataResponse (v2, 60 bytes, CorrId 2, rtt 16.00ms)
7|2018-04-10 00:57:16.443|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent MetadataRequest (v2, 39 bytes @ 0, CorrId 3)
7|2018-04-10 00:57:16.447|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received MetadataResponse (v2, 107 bytes, CorrId 3, rtt 0.00ms)
7|2018-04-10 00:57:17.447|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent MetadataRequest (v2, 39 bytes @ 0, CorrId 4)
7|2018-04-10 00:57:17.448|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 5)
7|2018-04-10 00:57:17.450|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received MetadataResponse (v2, 107 bytes, CorrId 4, rtt 0.00ms)
7|2018-04-10 00:57:17.454|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 5, rtt 0.00ms)
7|2018-04-10 00:57:18.455|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 6)
7|2018-04-10 00:57:18.458|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 6, rtt 15.00ms)
7|2018-04-10 00:57:19.458|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 7)
7|2018-04-10 00:57:19.461|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 7, rtt 0.00ms)
7|2018-04-10 00:57:20.462|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 8)
7|2018-04-10 00:57:20.465|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 8, rtt 0.00ms)
7|2018-04-10 00:57:21.466|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 9)
7|2018-04-10 00:57:21.468|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 9, rtt 0.00ms)
7|2018-04-10 00:57:22.469|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 10)
7|2018-04-10 00:57:22.474|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 10, rtt 16.00ms)
7|2018-04-10 00:57:23.475|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 11)
7|2018-04-10 00:57:23.479|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 11, rtt 0.00ms)
7|2018-04-10 00:57:24.480|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 12)
7|2018-04-10 00:57:24.485|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 12, rtt 0.00ms)
7|2018-04-10 00:57:25.485|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 13)
7|2018-04-10 00:57:25.488|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 13, rtt 15.00ms)
7|2018-04-10 00:57:26.489|rdkafka#producer-1|SEND| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Sent ProduceRequest (v3, 231 bytes @ 0, CorrId 14)
7|2018-04-10 00:57:26.493|rdkafka#producer-1|RECV| [thrd:192.168.1.10:9092/bootstrap]: 192.168.1.10:9092/0: Received ProduceResponse (v3, 48 bytes, CorrId 14, rtt 0.00ms)
10/04/2018 00:57:26
I was able to replicate and reducing socket.blocking.max.ms solved the problem. I only see this behavior on Windows. @edenhill can you comment ...
https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency
The control loop in librdkafka makes use file descriptor events to wake up in a timely fashion, this is unfortunately not supported out-of-the-box on Windows and until we've ported that part of the code over you'll need to set socket.blocking.max.ms as a workaround.
@edenhill @mhowlett
I tested it last time, the similar issue (very slow) occurs when the code runs on RedHat Linux where the server is running. Does the issue you mentioned apply to .NET client running on Linux?
@edenhill @mhowlett
What value should socket.blocking.max.ms be set to get it running? It is not mentioned on the link provided.
Shoud the issue marked as a bug instead?
socket.blocking.max.ms will be the lower limit for the producer latency, so set it according to your requirements.
Balance latency vs CPU usage, lower latency - higher CPU usage.
@edenhill
Thanks for the advice. I am doing performance testing against other brokers.
Below is the settings used for AdvancedProducer, it is still more than 2 times slower than other brokers with similar functionality.
private static Dictionary
new Dictionary
{
{ "bootstrap.servers", brokerList},
{ "client.id", "producer" },
// {"debug", "protocol" },
{"socket.blocking.max.ms", "1"}, //min=1
{"queue.buffering.max.ms", "0" }, //set to 0 for immediate transmission, or some other low reasonable value (e.g. 5 ms)
{ "default.topic.config", new Dictionary
{
{ "acks", "1" } //1 by default, globally as opposed to per-topic
}
}
};
Similarly, below is the code used for Consumer. Its speed is similar to what producer's.
private static Dictionary
new Dictionary
{
{ "group.id", "consumerGroupId"},
{ "enable.auto.commit", false},
//{ "auto.commit.interval.ms", 5000 },
//{ "statistics.interval.ms", 60000 },
//Consumer batch latency -fetch.wait.max.ms - how much time the consumer gives the broker to fill up fetch.min.bytes worth of messages before responding.
//Setting fetch.wait.max.ms too low (lower than the partition message rate) causes the occassional FetchRequest to return empty before any new messages were seen on the broker, this in turn kicks in the fetch.error.backoff.ms timer that waits that long before the next FetchRequest. So you might want to decrease fetch.error.backoff.ms too.
//https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency
{ "fetch.wait.max.ms", "0" },
{"fetch.error.backoff.ms", "0" },
{"socket.blocking.max.ms", "1"}, //min=1
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest"}
}
}
};
Is there anything I can change the settings to make it faster?
@mhowlett can you verify the speed using the settings provided on previous post? It is still very slow.
I just solved the same problem with my Devs... Its this line that creates the wait. Note that the comment itself tells you its a sync call... In our code this ended up causing us to wait and not push new events to the producer method until it got past the sync block...
result = deliveryReport.Result; // synchronously waits for message to be produced.
By removing this completely we went from less then 1 message a second to over 600. We did not need the socket property to get faster velocity just to remove that sync call.
that's still low, for e.g. 100 byte messages, max throughput is easily > 200000 msg/s.
But its significantly better then before were we had a rate under 1 per second. Our messages are also larger then 100 bytes, more like 4000 bytes +
Anyhow not looking to debate if we should tune better (we should), just wanted to share that for us the solution was not to use socket.blocking.max.ms as it appeared to do nothing. But removing the deliveryReport.Result gave us a HUGE speed boon.
@josephxsxn
Thanks for the info.
How do you deal with error response, and message re-sending? For example, you sent 1000 messages to server, all fine except for no. 55 and 68. How do you identify that messages and resend them again? Assuming message sending order is not important for your business.
@PingPongSet - you can configure the producer with a very high value for message.send.max.retries and let it worry about the re-try logic for you. you can also handle messages failures yourself asynchronously.
and yes, generally, you should not produce synchronously, it will be very slow.
"you can also handle messages failures yourself asynchronously."
The question is that does Kafka support this? What if after retry, I want different logic to handle the failed message? Thus, how do I identify a failed message from a successful message?
@PingPongSet From just looking at the examples I think the trick is to use continueWith. I have not tried this personally fyi.
The example has this comment near the continueWith code on the object that was blocking before deliveryReport.Result
while ((text = Console.ReadLine()) != "q")
{
var deliveryReport = producer.ProduceAsync(topicName, null, text);
deliveryReport.ContinueWith(task =>
{
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
});
}
// Tasks are not waited on synchronously (ContinueWith is not synchronous),
// so it's possible they may still in progress here.
producer.Flush(TimeSpan.FromSeconds(10));
I'll confirm that changing the socket settings in the client configuration reduced message roundtrip times between two queues and two producers (modified SimpleConsumer example) from 900ms to 10ms.
Actually, I correct this, I'm seeing slightly higher -- I'm seeing around ~15ms from producer send to consumer receive, so the total round trip is roughly double that.
@cmeiklejohn
I'll confirm that changing the socket settings in the client configuration reduced message roundtrip times between two queues and two producers (modified SimpleConsumer example) from 900ms to 10ms.
What settings do you use on your first?
Actually, I correct this, I'm seeing slightly higher -- I'm seeing around ~15ms from producer send to consumer receive, so the total round trip is roughly double that.
What do you mean by this?
Please provide code example to verify.
I have two questions that are related to this driver usage:
Should we expect, using the configuration settings around batching, in the workaround for the issue, to provide similar performance as to the version that takes advantage of the file system events? If not, do you have any thoughts on what the performance difference might be?
In general, what is the expected performance of the dotnet library as compared to one of the native libaries, like the Java library, etc? Order of magnitude? 5x? 2x?
@cmeiklejohn - performance of the .NET client is very good. Maximum throughput depends on parameters, but is generally a bit worse than the Java client (not >= 2x worse).
Note that this issue relates to latency, not throughput - the mechanism librdkafka uses to wake up on both socket io and new produce requests cannot be used on Windows, so socket.blocking.max.ms needs to be set to bound this. It's large change to fix this (though it's on the roadmap).
adding a note to the readme about this: #574
also https://github.com/edenhill/librdkafka/pull/1870 will help the situation a lot when merged.
Hi, i am facing the same issue, producer take too long. Now I am trying to find a solution and I landed on this. However the documentation in : https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md said that the "socket.blocking.max.ms" is deprecated.
Anyone could tell me what config that i need to use now ? Thanks