Azure-sdk-for-net: [BUG] EventHubs MessageSizeExceededException on retry of EventDataBatch when tracing enabled

Created on 11 May 2020  路  4Comments  路  Source: Azure/azure-sdk-for-net

Describe the bug
We intermitently get a MessageSizeExceededException exception when attempting to send a EventDataBatch on one of our production services. From testing i think this occurs when we retry sending a EventDataBatch and have diagnostic tracing enabled (specifically we are using application insights which subscribes to the "Microsoft.Azure.EventHubs" diagnostic source).

Expected behavior
No MessageSizeExceededException exception is thrown when we retry to send a batch

Actual behavior (include Exception or Stack Trace)
The exception we get is

MessageSizeExceededException: The received message (delivery-id:0, size:1068766 bytes) exceeds the limit (1046528 bytes) currently allowed on the link.
Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender+d__10.MoveNext():445
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender+d__10.MoveNext():766
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventDataSender+d__7.MoveNext():233
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventHubClient+d__22.MoveNext():222
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task):46
Microsoft.Azure.EventHubs.EventHubClient+d__23.MoveNext():149
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw():12
TrafficService.Services.AzureEventHubService+d__12.MoveNext() in D:\home\site\repository\TrafficService\Services\AzureEventHubService.cs:148

We added logging to write out the state of the batch (using reflection) after it failed to send, this shows that batch's internal state thinks the the total size is small than the max size as expected. This is because it didn't count the Diagnostic-Id header when building the batch a this is added only during the call to SendAsync

Batch failed to be sent to event hub. Number of items 273, MaxSize (bytes) = 1048576, CurrentSize (bytes) = 1045428

To Reproduce
The following code artificially reproduces the issue we are having by

  1. Subscribing to the event hubs diagnostic source (in our actual application we have installed the application insights library which subscribes to the "Microsoft.Azure.EventHubs" diagnostic source.)
  2. Calling client.CreateBatch() and adding items to it until TryAdd returns false

    • Each EventData populates it's AmqpMessage property with the built message as it determines the message size ( see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventDataBatch.cs#L86)

  3. Calling SendAsync passing in that batch

    • At this point the Diagnostic-Id header is added to each EventData in the batch because the diagnostic source is active (see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubsDiagnosticSource.cs#L232)

    • During the process of adding each eventdata to the final message, the EventData.AmqpMessage property is cleared, and so on a retry a new AmqpMessage is rebuilt (see https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpMessageConverter.cs#L56)

  4. The request fails for some unkown reason, and we retry sending it
  5. The request fails with a MessageSizeExceededException as each EventData's AmqpMessage is rebuilt including the Diagnostic-Id header now. This in turn can cause the batch to exceed the maximum size for a single message on the link
static async Task Main(string[] args)
{
    DiagnosticListener.AllListeners.Subscribe(new EventHubDiagListener());

    var client = EventHubClient.CreateFromConnectionString("{event_hub_connection_string");

    var batch = client.CreateBatch();
    batch.TryAdd(new EventData(new byte[] { 0x01, 0x01, 0x00 }));

    //ensure we open the link and get the correct max message size
    await client.SendAsync(batch);

    var random = new Random();

    // try create a batch that ~ max message size
    batch = client.CreateBatch();

    while (true)
    {
        var buffer = new byte[32750];
        random.NextBytes(buffer);
        var eventData = new EventData(buffer);
        var result = batch.TryAdd(eventData);
        if (!result)
            break;
    }

    await client.SendAsync(batch);
    await client.SendAsync(batch);  // Simulate retry of batch
}

public class EventHubDiagListener : IObserver<DiagnosticListener>
{
    public void OnCompleted() {}
    public void OnError(Exception error) { }
    public void OnNext(DiagnosticListener value)
    {
        if (value.Name == "Microsoft.Azure.EventHubs")
        {
            value.Subscribe(new EventHubsObserver());
        }
    }

    private class EventHubsObserver : IObserver<KeyValuePair<string, object>>
    {
        public void OnCompleted() { }
        public void OnError(Exception error) { }
        public void OnNext(KeyValuePair<string, object> value) { }
    }
}

Environment:

  • Name and version of the Library package used: Microsoft.Azure.EventHubs repo'd on version 3.0.0 and 4.2.0 (latest)
  • Hosting platform or OS and .NET runtime version: Azure app service (with application insights)
  • IDE and version : Visual Studio 16.5.4
Client Event Hubs Service Attention customer-reported needs-team-attention question

All 4 comments

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @jfggdl.

Thanks for reporting the issue with a lot of helpful pointers. I will check the repro code and have this fixed in the next release.

Fix merged. This will be available by 4.3.0 release.

Was this page helpful?
0 / 5 - 0 ratings