Confluent-kafka-dotnet: Cannot produce / consume tombstone message

Created on 24 Apr 2019  路  4Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

You cannot produce/consume a null message with the AvroSerializer, to a produce a null message then a specific producer needs to be used with the value serializer set as Serializers.Null.

So far using the AvroDeserializer , I have not found a way to consume these null messages, currently when doing this I get the following exception:

Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> System.IO.EndOfStreamException: Unable to read beyond the end of the stream.
   at System.IO.BinaryReader.ReadByte()
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1.Deserialize(String topic, Byte[] array)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.<>c__DisplayClass71_0.<<ConsumeViaBytes>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Confluent.Kafka.Consumer`2.ConsumeViaBytes(Int32 millisecondsTimeout)
   --- End of inner exception stack trace ---

How to reproduce

Use a specific producer for building tombstone messages:

var tombstoner = new ProducerBuilder<int, Null>(_kafkaConfiguration.ProducerConfiguration)
    .SetKeySerializer(new AvroSerializer<int>(_schemaRegistryClient))
    .SetValueSerializer(Serializers.Null)
    .Build();

var tasks = properties.Select(property => tombstoner.ProduceAsync("test-topic",
    new Message<int, Null>
    {
        Key = 1,
        Value = null,
        Timestamp = Timestamp.Default
    }));

The consumer code is as the examples on the GitHub homepage, it's currently the simple consumer example. Single partition, etc. The other non-null messages get handled fine. A ConsumeException is thrown for the null messages.

Checklist

Please provide the following information:

  • [Y] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [1.0.0-RC3] Confluent.Kafka nuget version.
  • [Kafka 2.0.1-L0 (Landoop)] Apache Kafka version.
  • [N] Client configuration.
  • [Windows 10 x64] Operating system.
  • [N] Provide logs (with "debug" : "..." as necessary in configuration).
  • [N] Provide broker log excerpts.
  • [N] Critical issue.
enhancement

Most helpful comment

We are facing a similar issue where we rely on tombstone messages as well to indicate deletes on a topic. Would love to see this enhancement gain some traction.

All 4 comments

thanks for pointing that out. in the java avro serdes, null is handled as a special case, but in the .net serdes, we don't have that logic. as a workaround for now, I suggest just wrapping our avro serdes and handling null in your wrapper (it will be a small number of loc).

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L56

I'm having a related issue.

I'm trying to consume a topic tombstone (Serialized key, null value) and the AvroDeserializer just breaks with an Unhandled consumer error: Local: Value deserialization error which has as an inner exception: System.ArgumentNullException and message: Buffer cannot be null.

I'm logging errors and effectively the message received was correct:

"Message": {
      "Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
      "Value": null,
      "Timestamp": {
        "Type": 1,
        "UnixTimestampMs": 1558523287993,
        "UtcDateTime": "2019-05-22T11:08:07.993Z"
      },
      "Headers": []
    }

Full Log:

[%TOPIC% ]Unhandled consumer error: Local: Value deserialization error
[%TOPIC% ]Unhandled exception: {
  "ConsumerRecord": {
    "Topic": "%TOPIC%",
    "Partition": {
      "Value": 1,
      "IsSpecial": false
    },
    "Offset": {
      "Value": 38675,
      "IsSpecial": false
    },
    "TopicPartition": {
      "Topic": "%TOPIC%",
      "Partition": {
        "Value": 1,
        "IsSpecial": false
      }
    },
    "TopicPartitionOffset": {
      "Topic": "%TOPIC%",
      "Partition": {
        "Value": 1,
        "IsSpecial": false
      },
      "Offset": {
        "Value": 38675,
        "IsSpecial": false
      },
      "TopicPartition": {
        "Topic": "%TOPIC%",
        "Partition": {
          "Value": 1,
          "IsSpecial": false
        }
      }
    },
    "Message": {
      "Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
      "Value": null,
      "Timestamp": {
        "Type": 1,
        "UnixTimestampMs": 1558523287993,
        "UtcDateTime": "2019-05-22T11:08:07.993Z"
      },
      "Headers": []
    },
    "Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
    "Value": null,
    "Timestamp": {
      "Type": 1,
      "UnixTimestampMs": 1558523287993,
      "UtcDateTime": "2019-05-22T11:08:07.993Z"
    },
    "Headers": [],
    "IsPartitionEOF": false
  },
  "Error": {
    "Code": -159,
    "IsFatal": false,
    "Reason": "Local: Value deserialization error",
    "IsError": true,
    "IsLocalError": true,
    "IsBrokerError": false
  },
  "Message": "Local: Value deserialization error",
  "Data": {
    "topic": "%TOPIC%"
  },
  "InnerException": {
    "ClassName": "System.ArgumentNullException",
    "Message": "Buffer cannot be null.",
    "Data": {},
    "InnerException": null,
    "HelpURL": null,
    "StackTraceString": "   at System.IO.MemoryStream..ctor(Byte[] buffer, Boolean writable)\r\n   at System.IO.MemoryStream..ctor(Byte[] buffer)\r\n   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl'1.<Deserialize>d__8.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Confluent.SchemaRegistry.Serdes.AvroDeserializer'1.<DeserializeAsync>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer'1.Deserialize(ReadOnlySpan'1 data, Boolean isNull, SerializationContext context)\r\n   at Confluent.Kafka.Consumer'2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer'1 keyDeserializer, IDeserializer'1 valueDeserializer)",
    "RemoteStackTraceString": null,
    "RemoteStackIndex": 0,
    "ExceptionMethod": "1\n.ctor\nmscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089\nSystem.IO.MemoryStream\nVoid .ctor(Byte[], Boolean)",
    "HResult": -2147467261,
    "Source": "mscorlib",
    "WatsonBuckets": null,
    "ParamName": "buffer"
  },
  "TargetSite": {
    "Name": "ConsumeImpl",
    "AssemblyName": "Confluent.Kafka, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
    "ClassName": "Confluent.Kafka.Consumer'2",
    "Signature": "Confluent.Kafka.ConsumeResult'2[K,V] ConsumeImpl[K,V](Int32, Confluent.Kafka.IDeserializer'1[K], Confluent.Kafka.IDeserializer'1[V])",
    "Signature2": "Confluent.Kafka.ConsumeResult'2[K,V] ConsumeImpl[K,V](System.Int32, Confluent.Kafka.IDeserializer'1[K], Confluent.Kafka.IDeserializer'1[V])",
    "MemberType": 8,
    "GenericArguments": null
  },
  "StackTrace": "   at Confluent.Kafka.Consumer'2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer'1 keyDeserializer, IDeserializer'1 valueDeserializer)\r\n   at Confluent.Kafka.Consumer'2.Consume(CancellationToken cancellationToken)\r\n   at CS.DCM.Kafka.Business.Consume.ConsumerBase'2.Consume(IConsumer'2 consumer, Action'1 progressFeedback, Action'1 errorFeedback, Action'1 deleteFeedback, IConsumptionTarget'2 repo, CancellationToken token) in %LOCALPROJECT%\\Consume\\ConsumerBase.cs:line 115",
  "HelpLink": null,
  "Source": "Confluent.Kafka",
  "HResult": -2146233088
}

We are facing a similar issue where we rely on tombstone messages as well to indicate deletes on a topic. Would love to see this enhancement gain some traction.

thanks for pointing that out. in the java avro serdes, null is handled as a special case, but in the .net serdes, we don't have that logic. as a workaround for now, I suggest just wrapping our avro serdes and handling null in your wrapper (it will be a small number of loc).

Is there an ETA on when this will be fixed? We're trying to consume data from a topic created by Debezium. It works fine for normal messages but tombstones result in the following exception.

Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> System.ArgumentNullException: Buffer cannot be null.
Parameter name: buffer
   at System.IO.MemoryStream..ctor(Byte[] buffer, Boolean writable)
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1.Deserialize(String topic, Byte[] array)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer`2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
   at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
   at TCGplayer.Foundation.EventBus.Kafka.KafkaEventSubscriber`2.<>c__DisplayClass19_0.<ExecuteAsync>b__0()

Looks like the problem is here.

Was this page helpful?
0 / 5 - 0 ratings