Azure-sdk-for-net: [BUG] ServiceFabricProcessor stops processing partition(s)

Created on 30 Aug 2019  路  19Comments  路  Source: Azure/azure-sdk-for-net

I've now seen this a couple of times (after switching to ServiceFabricProcessor about a month ago). Suddenly, it some partitions stop processing altogether. Restarting the replica helps. This is very hard to diagnose and catch, so i'm not sure what to do here. May be someone from a relevant team can step up and give me some hint .. May be i need to enable some logging or something, so i know what's going on?

Client Event Hubs Service Attention customer-reported

Most helpful comment

Thank you for this report! This sounds like an issue reported previously, but the customer wasn't able to provide enough information for us to be sure what was going on. With this new info, there appear to be two problems:
1) "Can't create session when the connection is closing" is supposed to be a retryable error, but the underlying Event Hubs client is throwing it as an InvalidOperationException rather than an EventHubsException with IsTransient=true. Therefore the code is treating it as an unrecoverable error and shutting down. I will discuss this with the client maintainer.
2) CloseAsync being called means that the shutdown path is executing. When that path ends, the Task returned by ServiceFabricProcessor.RunAsync() completes, and ServiceFabricProcessor is shut down. It is up to the calling application to monitor that Task and either restart ServiceFabricProcessor or do something sensible with the failure. This path is intended to execute when there has been an error that ServiceFabricProcessor itself cannot recover from (for example, event hub not found), so that it does not appear to be running when it cannot ever make progress.

All 19 comments

@JamesBirdsall, @serkantkaraca, @sjkwak : Would one of you be kind enough to offer your thoughts?

@dmytro-gokun Just to be clear, some partitions stop processing while others are still going, and the Task returned by ServiceFabricProcessor.RunAsync is not completed, right? Are you getting any errors reported to ProcessErrorAsync on your event processor class?

@JamesBirdsall

Just to be clear, some partitions stop processing while others are still going

Exactly, some partition(s) stop while others are completely fine and continue crunching new events.

Task returned by ServiceFabricProcessor.RunAsync is not completed, right?

Right.

Are you getting any errors reported to ProcessErrorAsync on your event processor class?

Nope. It just stops calling my IEventProcessor's ProcessEventsAsync method as far as i can see.

Would it be possible for you to collect client traces? Since it's not reporting errors to the error handler, something must be going wrong deeper down. ServiceFabricProcessor traces ETW events with provider name "Microsoft-Azure-EventHubs-ServiceFabricProcessor" and you can use any of the standard tools to capture them to a file while your application is running.

@dmytro-gokun, I am going to close this issue due to inactivity, but feel free to open another issue and refer to this one if you have the client traces to continue problem isolation procedures. Thanks.

Thanks for working with Microsoft on GitHub! Tell us how you feel about your experience using the reactions on this comment.

@jfggdl JFYI, this is what i think happened here. Before switching to ServiceFabricProcessor i was using EventProcessorHost. Now, in that i'd have a retry handler inside of ProcessEventsAsync to handle any transient errors. The only error that would not be retried was OperationCanceledException. That worked perfectly with EventProcessorHost for many months. While migrating to ServiceFabricProcessor, i left the same retry logic in place. But, ServiceFabricProcessor throws FabricNotPrimaryException when CheckpointAsync is called after the partion has been detached. My code would retry on that exception. And that's where it would hang or crash, i'm not sure. After excluding FabricNotPrimaryException from the retry handler (passing it through), i did not see this error happening again. But I wanted to give it a couple more weeks before reporting it as a solution.

Sounds good @dmytro-gokun. We will wait a couple of weeks for your findings.

@jfggdl Okay, it has happened again last night. Unfortunately, i've not been collecting ETW traces at that moment, but i have some interesting information from the logger.

  1. At 02:58:51.340Z, ProcessErrorAsync is called. PartitionId = 3, Exception:

System.InvalidOperationException: Can't create session when the connection is closing. at Microsoft.Azure.Amqp.AmqpConnection.AddSession(AmqpSession session, Nullable1 channel) at Microsoft.Azure.Amqp.AmqpCbsLink.OpenCbsRequestResponseLinkAsyncResult.GetAsyncSteps()+MoveNext() --- End of stack trace from previous location where exception was thrown --- at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.AmqpCbsLink.EndCreateCbsLink(IAsyncResult result) at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization) --- End of stack trace from previous location where exception was thrown --- at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult) at Microsoft.Azure.Amqp.IteratorAsyncResult1.StepCallback(IAsyncResult result) --- End of stack trace from previous location where exception was thrown --- at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.AmqpCbsLink.<>c__DisplayClass4_0.<SendTokenAsync>b__1(IAsyncResult a) at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization) --- End of stack trace from previous location where exception was thrown --- at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.CreateLinkAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout) at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime) at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime) at Microsoft.Azure.EventHubs.PartitionReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime) at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.ReceivePumpAsync(CancellationToken cancellationToken, Boolean invokeWhenNoEvents){"Type":"System.InvalidOperationException", "TargetSite":"Void AddSession(Microsoft.Azure.Amqp.AmqpSession, System.Nullable1[System.UInt16])", "StackTrace":" at Microsoft.Azure.Amqp.AmqpConnection.AddSession(AmqpSession session, Nullable1 channel)\r\n at Microsoft.Azure.Amqp.AmqpCbsLink.OpenCbsRequestResponseLinkAsyncResult.GetAsyncSteps()+MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at Microsoft.Azure.Amqp.AsyncResult.EndTAsyncResult\r\n at Microsoft.Azure.Amqp.AmqpCbsLink.EndCreateCbsLink(IAsyncResult result)\r\n at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)\r\n at Microsoft.Azure.Amqp.IteratorAsyncResult1.StepCallback(IAsyncResult result)\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at Microsoft.Azure.Amqp.AsyncResult.EndTAsyncResult\r\n at Microsoft.Azure.Amqp.AmqpCbsLink.<>c__DisplayClass4_0.b__1(IAsyncResult a)\r\n at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.CreateLinkAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)\r\n at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime)\r\n at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime)\r\n at Microsoft.Azure.EventHubs.PartitionReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan waitTime)\r\n at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver.ReceivePumpAsync(CancellationToken cancellationToken, Boolean invokeWhenNoEvents)", "Message":"Can't create session when the connection is closing.", "Data":{}, "Source":"Microsoft.Azure.Amqp", "HResult":-2146233079}

  1. At 02:58:51.494Z CloseAsync is called. PartitionId=3, Reason=Cancelled.

  2. Nothing in the logs. No new OpenAsync call, no shutting down the replica etc.

  3. At 05:05:02.183Z i've restarted the replica manually, since i noticed the partition was not working.

Thank you for this report! This sounds like an issue reported previously, but the customer wasn't able to provide enough information for us to be sure what was going on. With this new info, there appear to be two problems:
1) "Can't create session when the connection is closing" is supposed to be a retryable error, but the underlying Event Hubs client is throwing it as an InvalidOperationException rather than an EventHubsException with IsTransient=true. Therefore the code is treating it as an unrecoverable error and shutting down. I will discuss this with the client maintainer.
2) CloseAsync being called means that the shutdown path is executing. When that path ends, the Task returned by ServiceFabricProcessor.RunAsync() completes, and ServiceFabricProcessor is shut down. It is up to the calling application to monitor that Task and either restart ServiceFabricProcessor or do something sensible with the failure. This path is intended to execute when there has been an error that ServiceFabricProcessor itself cannot recover from (for example, event hub not found), so that it does not appear to be running when it cannot ever make progress.

@JamesBirdsall Thanks a lot for your prompt response!

  1. .... I will discuss this with the client maintainer.

Great! Please let us know if there is any update on that.

  1. ... It is up to the calling application to monitor that Task and either restart ServiceFabricProcessor or do something sensible with the failure.

I think this is the actual reason i see this partition "dying"! I do not actually do anything specific when RunAsync returns. That's because i was under impression it would never return unless the CancellationToken passed to it was cancelled. Was that an incorrect impression?

If it was, then my code should looks like this:

processor.RunAsync(_ct)

With your suggestion it should be changed to something like:

private static async Task RunAsync(ServiceFabricProcessor processor, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        try
        {
            await processor.RunAsync(ct);
        }
        catch (Exception ex)
        {
            Log.Error(ex, "Unexpected exception");
        }
    }
}

Or ... is it a better idea to force replica switch when RunAsync returns or throws?
Please let me know what you think.

@dmytro-gokun The task returned by RunAsync will complete under TWO conditions: if the cancellation token is cancelled, or if SFP encounters an unrecoverable error. If you log errors from SFP and have some kind of alerting system to let you know that there's a problem, or just keep an eye on the logs manually, then the while loop you suggest is a reasonable approach. If you want to get sophisticated, you might force a replica switch if it gets N errors in a row without making any progress, but that isn't going to be simple to code.

@dmytro-gokun To restart SFP, you will need to create a new instance of ServiceFabricProcessor in your while loop.

@JamesBirdsall Great, i'll follow your advice here and give it a spin. I will let you know in a few weeks if I see more partitions dying after this fix was applied.

In case anyone is interested, i've solved it like this:

        private async Task RunAsync(CancellationToken ct)
        {
            try
            {
                await _processor.RunAsync(ct);
                Log.Error("ServiceFabricProcessor.RunAsync encountered an unrecoverable error");
                RestartCurrentReplica();
            }
            catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
            {
            }
            catch (Exception ex)
            {
                Log.Error(ex);
                RestartCurrentReplica();
            }
        }

This works just fine

Thanks for working with Microsoft on GitHub! Tell us how you feel about your experience using the reactions on this comment.

In case anyone is interested, i've solved it like this:

        private async Task RunAsync(CancellationToken ct)
        {
            try
            {
                await _processor.RunAsync(ct);
                Log.Error("ServiceFabricProcessor.RunAsync encountered an unrecoverable error");
                RestartCurrentReplica();
            }
            catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
            {
            }
            catch (Exception ex)
            {
                Log.Error(ex);
                RestartCurrentReplica();
            }
        }

This works just fine

Please, How do you do for "RestartCurrentReplica" ?
Do you use FabricClient (RestartReplicaAsync) inside the SF service ?

@Kassoul well, in my case, i throw a FabricException from RunAsync, which results in replica restarting (as described here: https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicefabric.services.runtime.statefulservicebase.runasync?view=azure-dotnet#remarks). It may not suit your case though.

This solution is simple than expected and it's good for me too.
Thanks @dmytro-gokun for sharing your solution for this issue. I probably have the same.

Was this page helpful?
0 / 5 - 0 ratings