Orleans: Exceptions on stream consumers are not logged

Created on 30 Nov 2018  路  21Comments  路  Source: dotnet/orleans

Hey folks!

I don't know if it the desired behaviour but I noticed that when a grain receives a message and it throw an exception it doesn't even write a a Warning log. It just swallow the exception:

image

If I set FireAndForgetDelivery = false, then the caller get the exception:

image

This is my repro code:

[ImplicitStreamSubscription("MyStream")]
    public class ConsumerGrain : Grain, IConsumerGrain
    {
        private readonly ILogger _logger;

        public ConsumerGrain(ILoggerFactory loggerFactory)
        {
            this._logger = loggerFactory.CreateLogger<ConsumerGrain>();
        }

        public override async Task OnActivateAsync()
        {
            var provider = this.GetStreamProvider("MyProvider");
            var stream = provider.GetStream<int>(this.GetPrimaryKey(), "MyStream");

            var handles = await stream.GetAllSubscriptionHandles();
            if (handles.Count > 0)
            {
                var tasks = new List<Task>();
                foreach (var handle in handles)
                {
                    tasks.Add(handle.ResumeAsync(this.Consume));
                }
                await Task.WhenAll(tasks);
            }
            else
            {
                await stream.SubscribeAsync(this.Consume);
            }
        }
        public Task Consume(int number, StreamSequenceToken token)
        {
            if (number % 2 == 0)
            {
                this._logger.LogInformation($"Stream received a message with an even number: {number}");

            }
            else
            {
                throw new InvalidOperationException($"Ops, we got an odd number and that is not cool: {number}");
            }

            return Task.CompletedTask;
        }
    }
public class PublisherGrain : Grain, IPublisherGrain
    {
        private IAsyncStream<int> _stream;
        private readonly Random _rnd;
        private readonly ILogger _logger;

        public PublisherGrain(ILoggerFactory loggerFactory)
        {
            this._logger = loggerFactory.CreateLogger<PublisherGrain>();
            this._rnd = new Random();
        }

        public override Task OnActivateAsync()
        {
            var provider = this.GetStreamProvider("MyProvider");
            this._stream = provider.GetStream<int>(this.GetPrimaryKey(), "MyStream");

            this.RegisterTimer(_ =>
            {
                return _stream.OnNextAsync(this._rnd.Next());
            }, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2));

            return Task.CompletedTask;
        }
        public Task Publish()
        {
            this._logger.LogInformation("Running tests!");
            return Task.CompletedTask;
        }
    }

So, FireAndForgetDelivery makes sense to _ignore_ exceptions on the caller. However, other providers doesn't have that option (nor make sense to).

Shouldn't the stream runtime notify as a Warning that it tried to delivery a message but an exception happened? I know it does that if the consumer grain fail to get activated (i.e. fail on OnActivateAsync()) but not on the processing method...

Any thoughts?

All 21 comments

I mean, it is not that the producer should always be notified about the processing status on the consumer (i.e. not like every provider should bubble up exceptions up to the producer of the message. That only happen on SMS because it is a grain method call, I know), but the runtime _could_ log when it fail to execute the "_Delivery_" Task (i.e. the callback Task fail)...

As a quick alternative, I tried to bump the log level without success:

.ConfigureLogging(logging => 
                {
                    logging.AddFilter($"{typeof(Grain).FullName}.InvokeException", LogLevel.Debug);
                    logging.AddConsole();
                })

hmm for SMS stream, I think just as you mentioned, if FireAndForget set to false, then caller will get the exception.

But for other stream provider, more specifically, persistent stream providers which depends on external queues, and an agent pulling from those qeuees to deliver messages to consumers, by design it is not doing what you wanted it to, which is the producer will not be notified if one messages delivery failed.

Just a little context for persistent stream providers, the work flow goes as blow:
producer queue messages to some external queue and the call returns -> pulling agent on each silo pulls messages from its queues -> pulling agent distributes messages to consumers. So by curent design, it is not doing what you wanted to , nor it is easy to switch to that. With that being said, should producer be notified in this case is a separate question. I guess it will be ideal if the producer can be notified, but it is quite hard to do that without affecting scalability or change current design drastically, at least for persistent stream providers. I'm open for suggests on how to do that and also for what use cases requires that kind of behavior.

@xiazen I think you misunderstood.

I don't want the caller to be notified in any case.

What I want is to regardless of the provider (including SMS), an unhandled exception on the consumer should be catch by Orleans runtime and then logged.

The concept of publish/subscriber by definition tells that the producer shouldn't/doesn't care about the message processing by its subscribers. What it should care, depending on the delivery guarantees/semantics, is to just ensure the message is delivered to all the subscribers. The word is delivered, not processed.

The processing logic is scoped to the consumer.

Again, I just want to make sure Orleans catch and log unhandled exceptions on _any_ stream provider at runtime level.

Does it make sense?

And PS, runtime by default don't log delivery failures, at least not logging detailed info about delivery failures. But you can configure runtime to do that, by configure streaming with your implementation of IStreamFailureHandler

Ok great, I'd glad you don't want that , since that would be hard to do. Did you see my comment about IStreamFailureHandler? I think that is the thing you need. By default I think a no-op failure handler is configured, but if you want, you can configure your implementation of it. As below is its interface:

    public interface IStreamFailureHandler
    {
        bool ShouldFaultSubsriptionOnError { get; 
        Task OnDeliveryFailure(GuidId subscriptionId, string streamProviderName, IStreamIdentity streamIdentity, StreamSequenceToken sequenceToken
        Task OnSubscriptionFailure(GuidId subscriptionId, string streamProviderName, IStreamIdentity streamIdentity, StreamSequenceToken sequenceToken);
    }

As you can see, you can have a custom implementation do log messages for delivery failures by implementing OnDeliveryFailure method.

Unfortunately, configure a different IStreamFailureHandler is not that easy, you still need to configure one through implementing your custom IQueueAdapterFactory and so on. Let me know if you need guidance on that.

Sorry, I didn;t saw that...

So, in our case it is the SMS provider which is configured with FireAndForgetDelivery...

Do I still need to implement a IQueueAdapterFactory? If I just register the IStreamFailureHandler _after_ registering all the SMS provider on DI, wouldn't that override the one it inject?

If that is not the case, we should have some sort of default behaviour that logs errors as an option...

It is a very basic thing that the user expects on its logs, so if everyone have to implement the failure handler + the adapter factory, then we're doomed...

Hey sorry I didn't mention, IStreamFailureHandler is only for persistent stream provider, not for SMS. I mentioned it because I thought that it is persistent stream providers are bring troubles to you.

Can you explain more on what is the problem? I thought SMS stream provider is fine because if you set fireAndForget to false then you get logging on delivery failures.

Hey sorry I didn't mention, IStreamFailureHandler is only for persistent stream provider, not for SMS. I mentioned it because I thought that it is persistent stream providers are bring troubles to you.

I meant for all providers. It just happens SMS is the one we are mostly using here for something...

Can you explain more on what is the problem? I thought SMS stream provider is fine because if you set fireAndForget to false then you get logging on delivery failures.

We have streams that are using SMS provider with FireAndForgetDelivery = true. The caller doesn't care about the results and it is fine that it doesn't get an exception. However, the consumer code, if throwing an exception (like in my sample on the issue), the exception is not logger and we will never know that it actually happen.

Just create a simple silo project and add the code I pasted here on the issue and you will see the behaviour as described.

What I was expecting is that even when FireAndForgetDelivery is true, the stream runtime would log as a Warning or something else when a stream consumer fail to process the message.

Hmm I disagree. Whether it is runtime's responsibility to log a exception which happens in consumer (application code) is debatable. If the exception happens in application code, then it is applicaiton code's responsibility to log them in my opinion.

@xiazen okey... Lets put Orleans aside...

If your application code don't handle an exception in main(), it will crash the application and tell you the exception.

The way Orleans streams is today, I can't see that the exception even happens. It swallow the exception and nobody knows what is happening. The runtime should at least tell us that an exception happen even if it doesn't do anything with it.

I've already spoke with @sergeybykov and he also agree with me on that...

Let me try to rephrase what I think is the gist of @galvesribeiro is saying here, to make sure I didn't misunderstand.

When a stream event is delivered to a consumer grain, it is done by the runtime invoking a method on the grain. This is not very different from the runtime invoking a regular grain method call. If a regular grain method throws an unhandled exception, the runtime logs that fact as a warning (before returning that exception to the remote caller).

The expectation that @galvesribeiro had is that when his Consume method throws an unhandled exception, the runtime should do the same thing - log a warning about it, regardless of whether that exception will get delivered back to the producer (the SMS FireAndForgetDelivery=false case) or not.

@galvesribeiro Is my reframing a correct interpretation of your point?

If so, my inclination is to agree that such unhandled exceptions should be logged by the runtime. I see the purpose of IStreamFailureHandler as a different one - to handled skipped stream events, e.g. to save them on a side for offline reprocessing. So I don't think it's the right tool for simply logging unhandled exceptions.

Is my reframing a correct interpretation of your point?

Couldn't be better!

So I don't think it's the right tool for simply logging unhandled exceptions.

Yes, I believe that it would allow implementation of some sort of poison/dead letter queue. An overkill to _simple_ logging...

I believe the only issue here is that sms does not log delivery failures when fire&forget is enabled. I agree we should long in that case.

For persistent streams, I believe we already log, as well as call the grain's OnErrorAsync, and the failure handler.

While I do think we should address the SMS issues, Xiaos suggestion of turning off fire&forget is a viable workaround until we get this fixed, as callers can .ignore the task from the OnNextAsync.

For persistent streams, I believe we already log, as well as call the grain's OnErrorAsync, and the failure handler.

No, it doesn't happen. Nothing is logged.

While I do think we should address the SMS issues, Xiaos suggestion of turning off fire&forget is a viable workaround until we get this fixed, as callers can .ignore the task from the OnNextAsync.

I don't think it is a viable workaround. Ignore() is something we abolished (after having a hard time with it) across all our codebase, not to mention that we would need to change every single stream publisher to add it, which is not a real option here...

No, it doesn't happen. Nothing is logged.

Hmm odd. I'll look into. For persistent streams we retry for a configurable period of time. If the message can't successfully be delivered we log the following:

                    var message = $"Exception while trying to deliver msgs to stream {consumerData.StreamId} in PersistentStreamPullingAgentGrain.RunConsumerCursor";
                    logger.Error(ErrorCode.PersistentStreamPullingAgent_14, message, exc);

This log line may not be on the same silo as the consumer or producer grains as it is logged by the pulling agent.

I don't think it is a viable workaround

It doesn't need be .Ignore, just something like it. As far as changing all OnNextAsync calls across the code base, I can see that being a problem if it's a large code base. An alternative is to decorate the IStreamProvider to return an IAysncObservable that calls the underlying OnNextAsync behavior in an 'ignore' like way. These suggestions are just workarounds, but if the current (bad) behavior is not harmful enough to justify these workarounds, then, yeah, their not worth it..

Ok, I understood, but still, the codebase is too big and rely too much on streams to make those changes viable...

So, do you have a plan for this fix to go in?

Thanks!

I think there is a general acknowledgement of the issue. Team will discuss proper fix. Hopefully we can get it in 2.2, but I can't commit to release atm.

Plz see pull request #5230

Sorry @jason-bragg for being lazy... Yeah, that LGTM! Thanks!

Resolved via #5230.

Was this page helpful?
0 / 5 - 0 ratings