Confluent-kafka-dotnet: regex subscription causes librdkafka to fault

Created on 28 Mar 2018  Â·  28Comments  Â·  Source: confluentinc/confluent-kafka-dotnet

I'm getting reproducible faulting in librdkafka.dll on the consumer side with 0.11.3:

Faulting module name: librdkafka.dll
Exception code: 0xc0000409
Fault offset: 0x00111c45

  • This has only started happening since we started tapping into a live stream of data so are now testing with some decent volume. Furthermore it only happens when the consumer is well behind in the topic, therefore trying to catch up as fast as possible.
  • When the problem happens the process just crashes and its not possible to debug. The only information available is in the Windows event log and a Windows Error Report is created but it doesn't look to be much use.
  • I can reproduce this with the simple code found in the AdvancedConsumer project Poll example, I just point it at my topic and run from the beginning.
  • Once it crashes the first time if I restart the consumer it always crashes within 2 minutes, but its not related to specific messages in the topic as it will easily get past the previous offset it stopped on.
  • No logs of interest on the broker side.
  • I've tried upgrading (just librdkafka) to 0.11.4-RC2, but no change.

I've been scratching my head for a day experimenting with different things and the only breakthrough I've had is that if I don't output anything to the Console in the OnMessage event handler, everything works fine (entire topic of 1.6 million messages is consumed). What's more bizarre is that I do have "debug" set to "all" so there is plenty of Console output from librdkafka! If I have even just Console.WriteLine("*"); in my message handler, it blows up!

The only errors coming out from the debug log are these (regularly seen, and these come out even when everything is working so I don't think this is the issue):

7|2018-03-28 14:39:45.145|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev03:9092/3]: kafkadev03:9092/3: Protocol parse failure at 1031386/1048631 (rd_kafka_msgset_reader_v2:802) (incorrect broker.version.fallback?)
7|2018-03-28 14:39:45.145|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev03:9092/3]: kafkadev03:9092/3: product_raw [4] MessageSet at offset 396382 payload size 21245 > 17245 remaining bytes
7|2018-03-28 14:39:45.152|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev03:9092/3]: kafkadev03:9092/3: Protocol parse failure at 2094236/2097237 (rd_kafka_msgset_reader_v2:802) (incorrect broker.version.fallback?)
7|2018-03-28 14:39:45.152|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev03:9092/3]: kafkadev03:9092/3: product_raw [1] MessageSet at offset 392741 payload size 27264 > 3001 remaining bytes
7|2018-03-28 14:39:45.167|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev02:9092/2]: kafkadev02:9092/2: Protocol parse failure at 1032453/1048631 (rd_kafka_msgset_reader_v2:802) (incorrect broker.version.fallback?)
7|2018-03-28 14:39:45.168|rdkafka#consumer-1|PROTOERR| [thrd:kafkadev02:9092/2]: kafkadev02:9092/2: product_raw [3] MessageSet at offset 399982 payload size 25974 > 16178 remaining bytes
  • [x] Confluent.Kafka nuget version: 0.11.3
  • [x] Apache Kafka version: 1.0.0 (confluent docker running in Ubuntu)
  • [x] Provide logs (with "debug" : "..." as necessary in configuration)

    • [x] Operating system: Windows 10

MEDIUM bug librdkafka

Most helpful comment

@mhowlett, yes all the console apps were running in windows, including the .NET Core one. Incidentally, my framework console app is using 4.7.1.

I've run your code in a 4.6.1 console app and didn't get the problem. However I have had to wait for over 10 minutes to see the issue previously so I'm going to run a longer test. I'll also narrow down further by ruling out other differences between your test and mine (one other difference is message size - my messages average 1000 bytes).

All 28 comments

what client was used to produce the messages into Kafka?

The same client (Confluent.Kafka 0.11.3) on a different machine.

The messages are Avro+SR if that makes any difference. I have also noticed that it seems to work for a .NET Core Console app (all the crashes have been on .NET Framework Console app), though I do need to run it a few more times to be sure.

@edenhill - the protocol parse failures are worrying to me. do you have any input?

thanks for the detailed writeup @comdw

The protocol parse failures at debug level (7|..) are expected and okay (the broker may send partial responses).

I'm doing various other tests and will update here observations:

  • Using .NET Core is rock solid so far, can't reproduce the crash.
  • Offloading the Console output to another thread with Task.Run seems to improve things - it takes up to 10 minutes to crash rather than 1-2 mins. But it does still crash.
  • Doing console output outside of the event handlers (i.e. outside of the poll thread) so far does not crash.
  • To rule out the messages themselves being the issue, I filled another topic with 20m 1k messages using kafka-producer-perf-test and pointed the consumer at that. It still crashed.

I didn't mention it before but the topic is using lz4 compression.

I've experiencing the same. My little test program exits with code: 0xc0000409 when consuming a topic with lz4 compression, which has messages in plain string keys and values. It just terminate in a few seconds.

For the same data (except compression), producer & consumer, I don't see any issue with gzip.

@raycheung, good to know someone else is seeing this too! Does your test program output to the console?

I'm currently running a test on a topic with no compression and not seeing the issue after about 1 hour. I'll try other compression types to narrow this down a bit.

EDIT: All working with the other compression types, so only seeing this for lz4. I'm still confused about how Console output affects this. At least we have a workaround for now (don't use lz4!).

@edenhill, any idea where the fault comes from within librdkafka?

@comdw - I assume when you say you tested with .NET Core, you mean on windows? It seems like it could be some sort of memory problem in librdkafka related to lz4 (and that it works on .NET Core and the Console logging behavior may be random symptoms of this). It would surprise me if there were lz4 issues in librdkafka on linux, since that is relatively well used, but seems very plausible on Windows.

I just tried to reproduce with the below, but I see no problem.

  • .NET Framework 4.6.1 console app
  • Confluent.Kafka v1.0-experimental-2 (still based on librdkafka 0.11.4)

Do you see a problem when running the below?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace ConsoleApp10
{
    class Program
    {
        static void Main(string[] args)
        {
            var pconfig = new Dictionary<string, object>
            {
                { "bootstrap.servers", "10.200.7.144:9092" },
                { "compression.codec", "lz4" }
            };

            using (var p = new Producer<string, string>(pconfig, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
            {
                for (int i=0; i<2000000; ++i)
                {
                    p.ProduceAsync("testlz4", new Message<string, string> { Key = i.ToString(), Value = i.ToString() + i.ToString() + "sdfs" });
                }
            }

            var cconfig = new Dictionary<string, object>
            {
                { "bootstrap.servers", "10.200.7.144:9092" },
                { "group.id", Guid.NewGuid().ToString() },
                { "auto.offset.reset", "beginning" }
            };

            using (var c = new Consumer<string, string>(cconfig, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
            {
                c.Subscribe("testlz4");

                c.OnRecord += (_, r)
                    => Console.WriteLine(r.Key + " " + r.Value);

                c.OnPartitionEOF += (_, e)
                    => Console.WriteLine("end!");

                while (true)
                {
                    c.Poll(TimeSpan.FromSeconds(10));
                }
            }
        }
    }
}

@mhowlett, yes all the console apps were running in windows, including the .NET Core one. Incidentally, my framework console app is using 4.7.1.

I've run your code in a 4.6.1 console app and didn't get the problem. However I have had to wait for over 10 minutes to see the issue previously so I'm going to run a longer test. I'll also narrow down further by ruling out other differences between your test and mine (one other difference is message size - my messages average 1000 bytes).

Strange, I'm able to run the following sample fine with .Net 4.7.1 on Windows. Previously I was consuming a topic with lz4 payload produced by another Java program running on Linux (i.e. not using the same library).

https://github.com/raycheung/KafkaTest-dotnet/blob/master/KafkaTest/Program.fs

open System
open System.Text
open System.Threading
open Confluent.Kafka
open Confluent.Kafka.Serialization

[<EntryPoint>]
let main argv =
    printfn "%A" argv

    let uuid = Guid.NewGuid().ToString()
    let servers = argv |> Array.toList
    let topic = sprintf "fsharp-testing-topic-%s" uuid
    let pollTimeout = TimeSpan.FromMilliseconds 500.0

    printfn "topic=%s" topic

    let pConfig =
        Map [
            "bootstrap.servers", servers :> obj
            "compression.codec", "lz4" :> obj
        ]

    use producer = new Producer<_, _> (pConfig, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8))
    let pTasks =
        [for i in 1 .. 100000 ->
            let s = i * i
            let vs = [for _ in 1 .. 100 -> sprintf "square=%d;" s]
            producer.ProduceAsync (topic, i.ToString(), String.Concat vs)]


    let cConfig =
        Map [
            "bootstrap.servers", servers :> obj
            "group.id", "ff-data-" + uuid :> obj
            "auto.offset.reset", "earliest" :> obj
        ]

    let loop =
        async {
            use consumer = new Consumer<_,_>(cConfig, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8))
            consumer.Subscribe topic
            consumer.OnMessage.Add (fun msg -> printfn "key=%s" msg.Key)
            while true do
                consumer.Poll pollTimeout
        }

    Async.RunSynchronously(loop, 600000)

    0 // return an integer exit code

@mhowlett, I confirmed that upgrading my original console app to Confluent.Kafka v1.0-experimental-2 didn't improve matters, nor did targetting 4.6.1. It also crashed consuming the testlz4 topic generated by your example, after a longer period of time.

Also I finally got your example to crash (same fault) but only when consuming from the topic I had previously created and also it took almost an hour to crash! I think this rules out issues with the data produced and the topic itself, but a larger message size increases the likelihood of seeing the issue earlier.

One problem with reproducing this is that there is a seemingly random element. Its like there is a 0.000001% chance of failure on each message consumed so if you are very lucky you might not see the crash at all. Other times it crashes within seconds.

So anyway I modified your example to make the messages longer and tweaked the output a little and this crashed only 15 seconds into consumption. Once it has crashed once you can comment out the producer stage and re-run against the same topic (change the consumer group id or reset the offsets if it manages to get to the end of the topic). I hope this modified version will allow you to reproduce?

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace ConsoleApp10
{
    class Program
    {
        private static Stopwatch _stopwatch = new Stopwatch();

        static void Main(string[] args)
        {
            var pconfig = new Dictionary<string, object>
            {
                { "bootstrap.servers", "kafkadev-cluster:9092" },
                { "compression.codec", "lz4" }
            };

            using (var p = new Producer<string, string>(pconfig, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
            {
                for (int i = 0; i < 5000000; ++i)
                {
                    Console.Write($"Producing...{i}\r");
                    p.ProduceAsync("testlz4_1k", new Message<string, string> { Key = i.ToString(), Value = i.ToString() + i.ToString() + "VNtW0WEfsEaapVPE89HylYjds9vgsdyaRPZ59IIDdxzWagp55DACChbf8KqjlSXBfjWPZcueVRkGJcaZofr8UdnvZozYcMGOefFRcNCx2xlZ5jly0VwaAC0AD3eaeUY2fMpFnH4jY2gGIQw9bIkdqzNUL69JATuZascivaP9vwKMp2bxkr5RPTpGY098VYd0wqTq2r4ZwM4ulE2pqz56F0SzSzG3IwZQnpj2VKIapgTSL6A7p5pQ9FdTTgmCSLP6dkeHwyziW4WFXzU8p5u7JlIslzFindPNsd6LktHF4aqYY4fCuY82qksdwqKRvX4iJpE3DWesq4H7PIhery5kYCsB1GrlR77M4fYalQ95GIGLj62tsKg5bHTaKnxlYJwS3kQe8SAmnP8abVVNQ6rfWcGPxBJ8WTMiT9IzUl0bICdb2k0u6K7XvsvenDz5ihXugkSVAxFWBP9OTe8EVgpQhIVfNNNcVTuRRCcadGsNjKkJwCMbrXIgHheDgIm4DK3bu3FMeoovalCuDfzPYFBmJ0VCIsheJ5ZZGk1hPHjDU2tF663iaanUzFBdE0LBgx89DIUoaTksJiPMjwQlwXEFWOX9qfddQDZBCqKoNh6yW9jAYd7PE0iAHKx6t1CW5k22BLFyoc9mpzjAJOLMFqKjFCeEhiUbLTD4Q9863UUrrd1EwK7Tsep1QHJU33K6YCGA16QBjhxoSYDq2qalyE6DqFotVUXF5wIOx5F0rYtu1Vjx4xG7zflrUE9XsHXFPt1CgCRTWumnslFXzg4ScAr9497nXuPxNvSUA7KC2TAB4ATwUEDFcrfoIDk5JGpLaxRQ01ujhR5cgruypQNlQ5yzE9hwWiWgrxh0WTj8DNzUBliaikU3UIvJ9oNUkkLNLvhzyxuTR0I3UvHMhCDjqnYX6y0WXckGhUn2PfsMVNjN8bfRQ2BlDR2tEoGMastB20vBB3EKH9cJBJ5dDEHa8JHPzp8vXjc5SlhiivMR3AZMdpxubTBRTelMsc49LJHpfIH" });
                }
            }

            var cconfig = new Dictionary<string, object>
            {
                { "bootstrap.servers", "kafkadev-cluster:9092" },
                { "group.id", Guid.NewGuid().ToString() },
                { "auto.offset.reset", "beginning" }
            };

            using (var c = new Consumer<string, string>(cconfig, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
            {
                _stopwatch.Start();

                c.Subscribe("testlz4_1k");

                c.OnRecord += (_, r)
                    => Console.WriteLine(_stopwatch.Elapsed + " " + r.Partition + "," + r.Offset + " " + r.Value.Substring(0, 20));

                c.OnPartitionEOF += (_, e)
                    => Console.WriteLine("end!");

                while (true)
                {
                    c.Poll(TimeSpan.FromSeconds(10));
                }
            }
        }
    }
}

on my machine (Windows 10 running in a VMWare virtual machine on a MBP with Kafka running on the host) the above consumed all 5M message in ~10 minutes with no problem. I'll let this run with increasingly larger numbers of messages throughout today and see if I can hit the problem. thanks @comdw for reporting and your hard work debugging / setting up the test case.

And thanks @raycheung for the F# example, I was thinking just last night we need a better F# example... Do you have any feedback on the API from the perspective of F#? Can you think of anything important that doesn't work well in that language?

Hmm, that's frustrating. Just tried this on a Hyper-V Windows 10 VM and don't see the issue either. We've had the issue on Windows Server 2012 R2 however (running on a cloud hosted VM).

update: I had this running for > 4hrs yesterday, and the problem did not occur.
which cloud provider? i will hopefully get a chance to run some more tests ...

Rackspace

@mhowlett I'm not yet an expert of F# and definitely can't represent anything. :smiley: I found the API straightforward enough and plays nicely with my F# code, however if we have an asynchronous consumer it would be perfect, tracked here: https://github.com/confluentinc/confluent-kafka-dotnet/issues/487.

@raycheung - same comment as #487 - feel free to jot down any pseudo code as to what you think such an API should look like :-), I'll take note.

Running into the same issue with the Consumer. .net 4.6.1 with one instance of the consumer running using poll in a windows service, it works fine. If we increase the number of consumer instances to more than 1, it fails after few seconds, or it fails instantaneously while starting the service. If you try to start it n number of times, once in while it starts fine and fails after a while, ranging from few seconds to minutes.

Initially was using 0.11.4 then tried using 0.11.1 both having the same behavior. @mhowlett do we need to use a different API in the consumer other than poll?

Server: Windows Server 2012 R2 standard.

@comdw With Windows 10, (0.11.1) it does not occur, but with 2012 R2 it happens very frequently.

@mhowlett @comdw
We have nailed down what was causing this behavior for us. The Subscribe method we had a regex, to match a list of topics and that is causing heap corruption / above ntdll exception when there are more than 1 consumer instances with a process.

The issue goes away by not using Regex pattern and specifying the topics explicitly.

cc @edenhill

@rsethu Can you provide your original regexps and some example (matched) topic names?

We have the same ptoblem on a Windows 10 physical machine with 0.11.4.
The problem arises quickly if the consumer is "on late", while it is harder to reproduce if the consumer has only a few messages to consume.
Switching from lz4 to gzip solved the problem

@edenhill Regex we had "^spo."

its 2020 ! still not fixed ?

Can you provide your original regexps and some example (matched) topic names?

i saw it as open issue. thats why im asking if there is any progress

On Fri, Jun 12, 2020, 10:22 Magnus Edenhill notifications@github.com
wrote:

Can you provide your original regexps and some example (matched) topic
names?

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/confluentinc/confluent-kafka-dotnet/issues/482#issuecomment-643113769,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AKJVVU4PZYJWKUENKZ6XO2LRWHJURANCNFSM4EX3HRYA
.

@Nadav-Revuze If you are experiencing this issue please provide your regexp, the list of topics in the cluster (needed to reproduce), your client and OS version.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kvandake picture kvandake  Â·  3Comments

vinodres picture vinodres  Â·  4Comments

ietvijay picture ietvijay  Â·  3Comments

nitinpi picture nitinpi  Â·  4Comments

alfhv picture alfhv  Â·  3Comments