Describe the bug
Memory usage is increasing when there are no messages to process from EventHub.
I have a service running in Kubernetes that use new SDKv5 (Azure.Messaging.EventHubs.Processor v. 5.0.1). Over the night when there are no new messages in EventHub, memory usage of this service looks like this:

(because of memory usage limit in k8s, service gets an out of memory exception at some point and it gets restarted)
It can be easily reproduced with the sample console app below.
Some memory usage snapshots while the sample console app was running:

After snapshot 4 I've sent one message to the EventHub.
Last snapshot compared to the first one:

To Reproduce
Use the sample code below (it's a bit modified code based on sample from here: https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-reading-events)
The EventHub I used for testing has 4 partitions.
class Program
{
static async Task Main(string[] args)
{
await ProcessUntilCanceled(CancellationToken.None);
}
private static async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
var storageConnectionString = "fill_the_blanks";
var blobContainerName = "fill_the_blanks";
var eventHubsConnectionString = "fill_the_blanks";
var eventHubName = "fill_the_blanks";
var consumerGroup = "fill_the_blanks";
Task processEventHandler(ProcessEventArgs eventArgs)
{
if (eventArgs.HasEvent)
{
Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
}
return Task.CompletedTask;
}
Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"Error on Partition: {eventArgs.PartitionId}, Error: {eventArgs.Exception.Message}");
return Task.CompletedTask;
}
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var clientOptions = new EventProcessorClientOptions
{
TrackLastEnqueuedEventProperties = true,
MaximumWaitTime = TimeSpan.FromMilliseconds(100)
};
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName, clientOptions);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
processor.PartitionInitializingAsync += ProcessorOnPartitionInitializingAsync;
await processor.StartProcessingAsync();
try
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
}
private static Task ProcessorOnPartitionInitializingAsync(PartitionInitializingEventArgs arg)
{
Console.WriteLine(arg.PartitionId);
return Task.CompletedTask;
}
}
Hi @gregmus. Thanks for reaching out to help us understand the issue that you're running into, and for the detailed report. This isn't something that we've seen as a pattern across our stress clients during sustained runs an is something that I want to spend some time digging into a bit.
Can you clarify for me whether the behavior that you're seeing was limited to container-based scenarios or that you had seen when just running your sample app locally? What OS and .NET Core version are you running in the container, and if you're seeing this with local runs as well, would you share the same information about that platform?
Hi @jsquire.
To be honest I haven't seen that as well before when my services were running with SDKv4. Lately I started working with SDKv5 and migrated couple of services running in k8s in development and test environments. When there is no messages flow in the system I can see a regular restarts pattern, caused by memory usage limits, on pods running with SDKv5.
Today there was a small performance test performed in one environment.
Here is the event hub messages flow:

and here is the memory usage:

as you can see, during the test it all works fine, but before and just after the test, when there is no messages flow, memory spikes up again.
Service from the screenshots above is running in kubernetes. It's a container based on "mcr.microsoft.com/dotnet/core/sdk:3.1-buster" docker image, so it's a Debian 10 with .net core 3.1.
Of course I assumed that it is my code doing something wrong here, but at least that got me suspicious ;)
That's why I created a sample console app with the code I posted before, so without any of my custom logic, and seen the same memory usage pattern.
Sample console app (.net core 3.1) I was running locally from Visual Studio Professional 2019 (version 16.4.5), Windows 10 Pro.
@jsquire Could this be related to the fact that in the good case the linked token source created in
is never disposed and thus cannot be GCed even though you are explicitly setting it to null?
Ah after looking multiple times I found it deep down in the processor dispose implementation. Never mind then my comment
I've got a related question on this. The patterns in the documentation are all short-lived, essentially processing a batch in its entirety and then disposing of the Client class after completion.
Is that the required pattern based on internal implementation for cleanup (and could not following it lead to a memory leak like what's outlined here), or is that just a matter of convenience for short code snippets?
I also have a related question. In Processor examples there is one about sending heartbeat message for "ensuring that the handler for processing events is invoked on a fixed interval when no events are available":
Is that a recommended approach to keep the handler alive when no events are available? Could that be a workaround to avoid increasing memory usage in this case?
Ah after looking multiple times I found it deep down in the processor dispose implementation. Never mind then my comment
Thanks for helping to debug, @danielmarbach. As you alluded to, ownership of the linked cancellation source is passed off to the partition processor created on the following line and then disposed when partition processing stops during disposal of the partition processor (line 1417).
Service from the screenshots above is running in kubernetes. It's a container based on "mcr.microsoft.com/dotnet/core/sdk:3.1-buster" docker image, so it's a Debian 10 with .net core 3.1.
Sample console app (.net core 3.1) I was running locally from Visual Studio Professional 2019 (version 16.4.5), Windows 10 Pro.
Thank you for the additional context, @gregmus. While I definitely want to poke at things more to be sure that we don't have a leak, one of the things that I notice in the snippet that you were kind enough to share is that you're using a wait time of 100ms and the memory snapshot seems to surface an OperationCanceledException pretty high in the list. Effectively, what that value says is "please ping my handler every 100ms whether there is an event available or not" and sets the underlying AMQP infrastructure to do likewise when talking to the Event Hubs service. This would be equivalent to setting the following options in the 4.x EventProcessorHost:
// Microsoft.Azure.EventHubs.Processor.EventProcessorOptions (v4.x)
var options = new EventProcessorOptions
{
ReceiveTimeout = TimeSpan.FromMilliseconds(100),
InvokeProcessorAfterReceiveTimeout = true
};
Communication with the service is timing-based, and the service will respond with an OperationCanceledException that we intercept and interpret as there being no events available from the service. That I see a matching number of ExceptionDispatchInfo entries in the memory snapshot would seem to support that conclusion, since the AMQP library that we're built on appears to use that in one specific place, when the async call to the service is resolved.
When events are flowing through the system, this will most often present sparingly as it is reading from the local pre-fetch cache under the covers and isn't overly sensitive to potential network spikes so you don't end up seeing a long string of empty results. With no events moving through the system and a wait time of 100ms, there will be a large number of exceptions propagating. Here's where I get a bit speculative. I wonder if the number of exceptions are overwhelming GC in the environment and the memory limit for K8s is being triggered before .NET Core GC is being triggered.
The same AMQP stack is used for the basis of the v4.x and v5.x Event Hubs client libraries, so the underlying exception behavior would exist in both. May I ask what the ReceiveTimeout was that you were using with v4.x of the EventProcessorHost? I suspect that you may have been running that with the default of 60 seconds or a comparable value, which is why this behavior didn't surface there. I'd also be curious as to the value set for your InvokeProcessorAfterReceiveTimeout option when using v4.x, fi you don't mind sharing.
I've got a related question on this. The patterns in the documentation are all short-lived, essentially processing a batch in its entirety and then disposing of the Client class after completion.
Is that the required pattern based on internal implementation for cleanup (and could not following it lead to a memory leak like what's outlined here), or is that just a matter of convenience for short code snippets?
@OperatorOverload: Apologies for the confusion. The patterns in the samples and snippets are short-lived only in order to illustrate good practices in cleaning up resources. Because they're intentionally constrained to small scopes and scenarios of a single focus, they don't illustrate a real-world application.
Quick starts and more involved examples that better demonstrate real-world application use are something that should be covered. Embarrassingly, there are still gaps due to the slowness of ongoing efforts to consolidate samples and convert from v4 to v5, which are in progress.
I also have a related question. In Processor examples there is one about sending heartbeat message for "ensuring that the handler for processing events is invoked on a fixed interval when no events are available":
Is that a recommended approach to keep the handler alive when no events are available? Could that be a workaround to avoid increasing memory usage in this case?
There is no need to keep a handler alive. The handlers are registered until you unregister them and remain active so long as the processor is running.
The scenarios around heartbeating are largely related to health monitoring, such as emitting telemetry every 5 minutes to answer the questions of "did my processor die or are no events flowing through my system." It also is often used for advanced scenarios for checkpointing, such as "I only want to create a checkpoint every 100 events processed or every 10 minutes, whichever comes first." These are purely concerns of the application and not something required by the processor.
Hi @jsquire
Thanks a lot for details.
When events are flowing through the system, this will most often present sparingly as it is reading from the local pre-fetch cache under the covers and isn't overly sensitive to potential network spikes so you don't end up seeing a long string of empty results. With no events moving through the system and a wait time of 100ms, there will be a large number of exceptions propagating. Here's where I get a bit speculative. I wonder if the number of exceptions are overwhelming GC in the environment and the memory limit for K8s is being triggered before .NET Core GC is being triggered.
I've set quite high memory limit in k8s. Without messages flow service is restarting every 30 minutes, I suspect that GC would be triggered within that time.
The same AMQP stack is used for the basis of the v4.x and v5.x Event Hubs client libraries, so the underlying exception behavior would exist in both. May I ask what the
ReceiveTimeoutwas that you were using with v4.x of theEventProcessorHost? I suspect that you may have been running that with the default of 60 seconds or a comparable value, which is why this behavior didn't surface there. I'd also be curious as to the value set for yourInvokeProcessorAfterReceiveTimeoutoption when using v4.x, fi you don't mind sharing.
SDKv5 doesn't have a built-in support for batch processing. I had to implement batch processing support based on your sample:
https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample09_ProcessEventsByBatch.cs
With this approach messages are piling up in a batch per partition until it hits the max batch size limit to flush the batch for processing. If a batch size will not be reached, messages will be flushed after ReceiveTimeout (that triggers ProcessEventAsync event with an empty message). I need the batch to be flushed as soon as possible - when it hits the batch size, or after shortest possible timeout when there are no more messages available.
You are correct, my previous implementation based on SDKv4 was using different ReceiveTimeout.
It was running with ReceiveTimeout=10s and InvokeProcessorAfterReceiveTimeout=true.
Apologies for the slow reply. Before I share updates, I wanted to touch on something that you had mentioned:
SDKv5 doesn't have a built-in support for batch processing.
You’re correct that 5.0.1 doesn’t include types that support batching; unfortunately, we needed to defer some features for a short while to meet our initial GA goals. Our forthcoming v5.1.0 release (currently in preview) contains two new primitive types which are intended for use with more specialized and demanding scenarios.
The EventProcessor<TPartition> fills a similar role as the EventProcessorClient, with cooperative load balancing and resiliency as its core features but offering native batch processing, extension points for using different storage providers, and a lighter, less opinionated API. The caveat is that this comes in the form of an abstract base, which needs to be extended and the core “handler” activities implemented via override. More on the design and philosophy behind this can be found [here].
We're at a good point of stability within the changes for this month's milestone, so I've begun doing some sustained runs through our stress harness. I've enhanced our normal metrics to do some additional memory sampling as well, to see if there are any patterns that I notice. Thus far, I'm seeing memory use in a constant range over 3 and 5 day runs with about a 50mb delta between normal and peak usage.
Granted, our scenarios are to exercise the publish/consume loop with events flowing through the system at different rates, so they're not directly comparable to your usage. Another huge difference is that I'm using the v5.1.0 version of the EventHubProcessorClient, which received a very large overhaul.
Once we complete the testing needed for this month's release, I'll add some additional rigor that more closely matches this scenario and see if I can get a local reproduction to observe. Just for my awareness, can you share the memory limit that you've set for K8s so that I can use that as an additional data point for metrics?
Once we complete the testing needed for this month's release, I'll add some additional rigor that more closely matches this scenario and see if I can get a local reproduction to observe. Just for my awareness, can you share the memory limit that you've set for K8s so that I can use that as an additional data point for metrics?
Sure. My limit currently is set to 512Mi in k8s.
The
EventProcessor<TPartition>fills a similar role as theEventProcessorClient, with cooperative load balancing and resiliency as its core features but offering native batch processing, extension points for using different storage providers, and a lighter, less opinionated API. The caveat is that this comes in the form of an abstract base, which needs to be extended and the core “handler” activities implemented via override. More on the design and philosophy behind this can be found [here].
Btw. in SDKv4 partition context was providing information about most recent checkpointed lease - most importantly last checkpointed sequence number. Is there a specific reason why this information is not available anymore? (or maybe I can't find it in current SDK)
Btw. in SDKv4 partition context was providing information about most recent checkpointed lease - most importantly last checkpointed sequence number. Is there a specific reason why this information is not available anymore? (or maybe I can't find it in current SDK)
This would have been part of the early preview designs, going back to last autumn, so forgive me for not remembering the exact details. If I recall, the short answer is that the cost of increased complexity for developers learning to use the processor was not justified by the small convenience gained.
I remember that our user studies across languages showed that there was some confusion with having "Last Checkpointed" information in the context; it wasn't always clear to folks who was responsible for creating checkpoints - does the processor do that automatically and then show me what it did? When I call to create a checkpoint, is that just an extra one - will that overwrite the automatic checkpoints?
This is an area that we felt it was extremely important to represent clearly and ensure that developers using the library understand that they hold responsibility for managing checkpoints. Tracking the information needed to answer "what was the last thing that I created a checkpoint for" is viewed as straightforward task with little complexity, appropriate for the application code creating the checkpoints to own.
Just in case it is helpful, I'll mention that because the EventProcessor<TPartition> is focused on more advanced scenarios, the context for a partition is open for extension. The last checkpointed sequence, and other custom data could easily be held there and would flow as part of the processor's call context.
@jsquire I think I found the culprit, see
https://github.com/Azure/azure-amqp/issues/158
Maybe that helps. If my investigation was right then this affects also Service Bus SDK (that's where I reproduced it)
maybe the above comment is also of interest for you @nemakam @JoshLove-msft @ShivangiReja
@jsquire I think I found the culprit, see
Wow. That is a solid catch! I've been making the assumption that it was in the client library level and haven't yet found anything that would fit the cause. This sure does seem to explain why the behavior is only observed during periods without events flowing through the system. This is fantastic, @danielmarbach. I can't tell you how much we appreciate the assist here!
//fyi: @serkantkaraca, @JamesBirdsall, @sjkwak - This would likely be worth considering a release of Microsoft.Azure.EventHubs as well.
Apologies for the delay; thank you, again, for brining attention to this. I've been running different scenarios through our stress harness and some stand-alone samples including the one that you detailed. I've not been able to reproduce the same rapid growth that was impacting you with v5.1, which contains a non-trivial refactoring of the EventProcessorClient.
In your scenario, I started at roughly 1.5mb of memory for a console application. In a 6 hour run with instrumentation and memory snapshots, ran I did notice a slow and persistent growth of the managed heap in increments of roughly 1.2mb each 90 minutes. Analysis seems to implicate one of our dependencies. We did a code review with the team that owns that dependency and validated our usage pattern; they are currently doing an analysis to investigate the root cause.
In addition, we did a sweep through the implementation and found a few areas that we could reduce allocation and constrain the lifetime of objects. It doesn't account for much, but every little bit helps.
At this point, we're kind of in a holding pattern awaiting a new drop of the AMQP library and, potentially, a new dependency drop. I'm going to leave this open in the meantime to continue to track things. Once we're able, I'll run another pass and compare results.
The new build of the AMQP library (v2.4.5) was recently released, containing the timer disposal fixes from Daniel. This was included in our v5.2.0-preview.2 release, which I've been running through our stress harness. I'm no longer seeing the same evidence of timers awaiting finalization and I'm seeing a fairly flat memory use, where I'm reasonably sure that the small bit of growth that I'm seeing are a remnant of our instrumentation and metrics tracking within the harness.
I'm going to close this investigation out at this point, but please feel free to reopen if you're continuing to see the behavior using the new package.
Most helpful comment
maybe the above comment is also of interest for you @nemakam @JoshLove-msft @ShivangiReja