Describe the bug
We intermitently get a MessageSizeExceededException exception when attempting to send a EventDataBatch on one of our production services. From testing i think this occurs when we retry sending a EventDataBatch and have diagnostic tracing enabled (specifically we are using application insights which subscribes to the "Microsoft.Azure.EventHubs" diagnostic source).
Expected behavior
No MessageSizeExceededException exception is thrown when we retry to send a batch
Actual behavior (include Exception or Stack Trace)
The exception we get is
MessageSizeExceededException: The received message (delivery-id:0, size:1068766 bytes) exceeds the limit (1046528 bytes) currently allowed on the link.
Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender+d__10.MoveNext():445
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender+d__10.MoveNext():766
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventDataSender+d__7.MoveNext():233
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventHubClient+d__22.MoveNext():222
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventHubClient+d__23.MoveNext():149
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
TrafficService.Services.AzureEventHubService+d__12.MoveNext() in D:\home\site\repository\TrafficService\Services\AzureEventHubService.cs:148
We added logging to write out the state of the batch (using reflection) after it failed to send, this shows that batch's internal state thinks the the total size is small than the max size as expected. This is because it didn't count the Diagnostic-Id header when building the batch a this is added only during the call to SendAsync
Batch failed to be sent to event hub. Number of items 273, MaxSize (bytes) = 1048576, CurrentSize (bytes) = 1045428
To Reproduce
The following code artificially reproduces the issue we are having by
client.CreateBatch() and adding items to it until TryAdd returns falseAmqpMessage property with the built message as it determines the message size ( see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventDataBatch.cs#L86)SendAsync passing in that batchDiagnostic-Id header is added to each EventData in the batch because the diagnostic source is active (see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubsDiagnosticSource.cs#L232)EventData.AmqpMessage property is cleared, and so on a retry a new AmqpMessage is rebuilt (see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpMessageConverter.cs#L56)MessageSizeExceededException as each EventData's AmqpMessage is rebuilt including the Diagnostic-Id header now. This in turn can cause the batch to exceed the maximum size for a single message on the linkstatic async Task Main(string[] args)
{
DiagnosticListener.AllListeners.Subscribe(new EventHubDiagListener());
var client = EventHubClient.CreateFromConnectionString("{event_hub_connection_string");
var batch = client.CreateBatch();
batch.TryAdd(new EventData(new byte[] { 0x01, 0x01, 0x00 }));
//ensure we open the link and get the correct max message size
await client.SendAsync(batch);
var random = new Random();
// try create a batch that ~ max message size
batch = client.CreateBatch();
while (true)
{
var buffer = new byte[32750];
random.NextBytes(buffer);
var eventData = new EventData(buffer);
var result = batch.TryAdd(eventData);
if (!result)
break;
}
await client.SendAsync(batch);
await client.SendAsync(batch); // Simulate retry of batch
}
public class EventHubDiagListener : IObserver<DiagnosticListener>
{
public void OnCompleted() {}
public void OnError(Exception error) { }
public void OnNext(DiagnosticListener value)
{
if (value.Name == "Microsoft.Azure.EventHubs")
{
value.Subscribe(new EventHubsObserver());
}
}
private class EventHubsObserver : IObserver<KeyValuePair<string, object>>
{
public void OnCompleted() { }
public void OnError(Exception error) { }
public void OnNext(KeyValuePair<string, object> value) { }
}
}
Environment:
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @jfggdl.
Thanks for reporting the issue with a lot of helpful pointers. I will check the repro code and have this fixed in the next release.
Fix under review https://github.com/Azure/azure-sdk-for-net/pull/13966
Fix merged. This will be available by 4.3.0 release.