You cannot produce/consume a null message with the AvroSerializer, to a produce a null message then a specific producer needs to be used with the value serializer set as Serializers.Null.
So far using the AvroDeserializer , I have not found a way to consume these null messages, currently when doing this I get the following exception:
Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> System.IO.EndOfStreamException: Unable to read beyond the end of the stream.
at System.IO.BinaryReader.ReadByte()
at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1.Deserialize(String topic, Byte[] array)
at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
at Confluent.Kafka.Consumer`2.<>c__DisplayClass71_0.<<ConsumeViaBytes>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Confluent.Kafka.Consumer`2.ConsumeViaBytes(Int32 millisecondsTimeout)
--- End of inner exception stack trace ---
Use a specific producer for building tombstone messages:
var tombstoner = new ProducerBuilder<int, Null>(_kafkaConfiguration.ProducerConfiguration)
.SetKeySerializer(new AvroSerializer<int>(_schemaRegistryClient))
.SetValueSerializer(Serializers.Null)
.Build();
var tasks = properties.Select(property => tombstoner.ProduceAsync("test-topic",
new Message<int, Null>
{
Key = 1,
Value = null,
Timestamp = Timestamp.Default
}));
The consumer code is as the examples on the GitHub homepage, it's currently the simple consumer example. Single partition, etc. The other non-null messages get handled fine. A ConsumeException is thrown for the null messages.
Please provide the following information:
thanks for pointing that out. in the java avro serdes, null is handled as a special case, but in the .net serdes, we don't have that logic. as a workaround for now, I suggest just wrapping our avro serdes and handling null in your wrapper (it will be a small number of loc).
I'm having a related issue.
I'm trying to consume a topic tombstone (Serialized key, null value) and the AvroDeserializer just breaks with an Unhandled consumer error: Local: Value deserialization error which has as an inner exception: System.ArgumentNullException and message: Buffer cannot be null.
I'm logging errors and effectively the message received was correct:
"Message": {
"Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
"Value": null,
"Timestamp": {
"Type": 1,
"UnixTimestampMs": 1558523287993,
"UtcDateTime": "2019-05-22T11:08:07.993Z"
},
"Headers": []
}
[%TOPIC% ]Unhandled consumer error: Local: Value deserialization error
[%TOPIC% ]Unhandled exception: {
"ConsumerRecord": {
"Topic": "%TOPIC%",
"Partition": {
"Value": 1,
"IsSpecial": false
},
"Offset": {
"Value": 38675,
"IsSpecial": false
},
"TopicPartition": {
"Topic": "%TOPIC%",
"Partition": {
"Value": 1,
"IsSpecial": false
}
},
"TopicPartitionOffset": {
"Topic": "%TOPIC%",
"Partition": {
"Value": 1,
"IsSpecial": false
},
"Offset": {
"Value": 38675,
"IsSpecial": false
},
"TopicPartition": {
"Topic": "%TOPIC%",
"Partition": {
"Value": 1,
"IsSpecial": false
}
}
},
"Message": {
"Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
"Value": null,
"Timestamp": {
"Type": 1,
"UnixTimestampMs": 1558523287993,
"UtcDateTime": "2019-05-22T11:08:07.993Z"
},
"Headers": []
},
"Key": "AAAAAm8AFENVV1NQSUNVUjEA8AQ=",
"Value": null,
"Timestamp": {
"Type": 1,
"UnixTimestampMs": 1558523287993,
"UtcDateTime": "2019-05-22T11:08:07.993Z"
},
"Headers": [],
"IsPartitionEOF": false
},
"Error": {
"Code": -159,
"IsFatal": false,
"Reason": "Local: Value deserialization error",
"IsError": true,
"IsLocalError": true,
"IsBrokerError": false
},
"Message": "Local: Value deserialization error",
"Data": {
"topic": "%TOPIC%"
},
"InnerException": {
"ClassName": "System.ArgumentNullException",
"Message": "Buffer cannot be null.",
"Data": {},
"InnerException": null,
"HelpURL": null,
"StackTraceString": " at System.IO.MemoryStream..ctor(Byte[] buffer, Boolean writable)\r\n at System.IO.MemoryStream..ctor(Byte[] buffer)\r\n at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl'1.<Deserialize>d__8.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n at Confluent.SchemaRegistry.Serdes.AvroDeserializer'1.<DeserializeAsync>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer'1.Deserialize(ReadOnlySpan'1 data, Boolean isNull, SerializationContext context)\r\n at Confluent.Kafka.Consumer'2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer'1 keyDeserializer, IDeserializer'1 valueDeserializer)",
"RemoteStackTraceString": null,
"RemoteStackIndex": 0,
"ExceptionMethod": "1\n.ctor\nmscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089\nSystem.IO.MemoryStream\nVoid .ctor(Byte[], Boolean)",
"HResult": -2147467261,
"Source": "mscorlib",
"WatsonBuckets": null,
"ParamName": "buffer"
},
"TargetSite": {
"Name": "ConsumeImpl",
"AssemblyName": "Confluent.Kafka, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"ClassName": "Confluent.Kafka.Consumer'2",
"Signature": "Confluent.Kafka.ConsumeResult'2[K,V] ConsumeImpl[K,V](Int32, Confluent.Kafka.IDeserializer'1[K], Confluent.Kafka.IDeserializer'1[V])",
"Signature2": "Confluent.Kafka.ConsumeResult'2[K,V] ConsumeImpl[K,V](System.Int32, Confluent.Kafka.IDeserializer'1[K], Confluent.Kafka.IDeserializer'1[V])",
"MemberType": 8,
"GenericArguments": null
},
"StackTrace": " at Confluent.Kafka.Consumer'2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer'1 keyDeserializer, IDeserializer'1 valueDeserializer)\r\n at Confluent.Kafka.Consumer'2.Consume(CancellationToken cancellationToken)\r\n at CS.DCM.Kafka.Business.Consume.ConsumerBase'2.Consume(IConsumer'2 consumer, Action'1 progressFeedback, Action'1 errorFeedback, Action'1 deleteFeedback, IConsumptionTarget'2 repo, CancellationToken token) in %LOCALPROJECT%\\Consume\\ConsumerBase.cs:line 115",
"HelpLink": null,
"Source": "Confluent.Kafka",
"HResult": -2146233088
}
We are facing a similar issue where we rely on tombstone messages as well to indicate deletes on a topic. Would love to see this enhancement gain some traction.
thanks for pointing that out. in the java avro serdes,
nullis handled as a special case, but in the .net serdes, we don't have that logic. as a workaround for now, I suggest just wrapping our avro serdes and handlingnullin your wrapper (it will be a small number of loc).
Is there an ETA on when this will be fixed? We're trying to consume data from a topic created by Debezium. It works fine for normal messages but tombstones result in the following exception.
Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> System.ArgumentNullException: Buffer cannot be null.
Parameter name: buffer
at System.IO.MemoryStream..ctor(Byte[] buffer, Boolean writable)
at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1.Deserialize(String topic, Byte[] array)
at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
at Confluent.Kafka.Consumer`2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
--- End of inner exception stack trace ---
at Confluent.Kafka.Consumer`2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
at TCGplayer.Foundation.EventBus.Kafka.KafkaEventSubscriber`2.<>c__DisplayClass19_0.<ExecuteAsync>b__0()
Looks like the problem is here.
Most helpful comment
We are facing a similar issue where we rely on tombstone messages as well to indicate deletes on a topic. Would love to see this enhancement gain some traction.