I would like to make a handy extension method to attempt to write state, and in case of an error log and apply a rollback. However, WriteStateAsync being protected ruins that plan.
public static async Task WriteStateWithRollback(this Grain grain, Action<Exception> onError, Action rollback)
{
try
{
await grain.WriteStateAsync(); // compiler error :(
}
catch (Exception e)
{
onError(e);
rollback();
throw;
}
}
// usage:
this.State.SomeCollection.Add(newItem);
await this.WriteStateWithRollback(ex => Log(ex), () => this.State.SomeCollection.Remove(newItem));
Would anyone be opposed to perhaps the smallest-ever pull request that exposes WriteStateAsync to the outside world?
Extension methods are a handy instrument. My concern here though is that making WriteStateAsync (and other state manipulation methods) public would conceptually violate the intent of the API that only the grain itself should be able to persist/read/clear its own state. Even though it would be nearly impossible to invoke such a method against a different grain anyway, I'm afraid the API would become a bit unclean an ambiguous.
The alternative I can think of is to add WriteStateWithRollback as a protected method of Grain class. However, unlike an application defined extension method, this would become a 'permanent' part of the API. So the bar for such a change is naturally higher. There's also a desire to revisit the storage API and introduce a new, more flexible and robust, version of it, which we sometimes refer to as Storage 2.0. But there's currently no active development towards that.
Another option that keeps such a feature at the application code level is to have a base class for all your grain classes that would add WriteStateWithRollback. That's probably the easiest option, unless I missed something else.
Yeah, those are the alternatives I came up with as well (minus Storage 2.0, which is beyond me at the moment).
I also agree that adding a new rollback method is also probably not the greatest solution. That's the type of addition where everyone is going to pick their own favorite method signature, and few people will be happy with the result.
Assuming you're going to stand firm on keeping WriteStateAsync a protected member 馃槃, a base class will be fine for my purposes. I'll close this for now.
@moswald, I apologize for derailing your question, but I fear what you're attempting will not work even if the call was public.
Would you be so kind as to describe what the rollback call does?
@moswald Oops, i'm a dork, it's in the call, I missed that.
Ok, so here are the problems..
A failure of a storage call does not mean that the operation failed.
Storage systems like Azure Table storage can report failure even if the operation succeeded.
If the operation succeeded but an exception was thrown (like timeout), rolling back the state will cause the grain to be out of sync with storage, which is what I think you wish to prevent with this change.
Additionally, such an error will mean your eTag is no longer valid, so no future storage calls will succeed until state has been re-read from storage.
Even if the failure means that the operation didn't go through, it's not safe to assume that the next will. If the failure is due to eTag violation, no future calls will succeed until the state is re-read.
A further complication is that even if one handles storage calls well, the grain state can get out of sync with storage if -any- error occurs between the first modification of the state and the storage call.
consider
this.State.name = user.Name
this.State.month = user.Birthday.Month;
try
{
await WriteStateAsync()
} catch(..){..}
Regardless of how the write is handled, if the Birthday is null, a null reference exception will be thrown, but the grain state will remain changed, even if not persisted.
The recommended pattern to address such issues is to wrap the entire modification and write of grain state in a try/catch, and call DeactivateOnIdle() if an error is encountered. This will cause the grain to deactivate, and the next call will reactive the grain, loading a clean state from storage. This is not the only pattern that works, but the one I'd suggest if it is important to keep grain state and storage in sync.
Again, I apologize for taking this thread in a direction you were not asking about, but I've spent more time than I'd like debugging related issues and would be sad if you ended up doing so as well.
Don't apologize, this is incredibly useful information --it's pretty much exactly explains why my wanting to rollback state wouldn't work. My imaginary rollback extension method was going to be a bit more complex in case the write was successful but a different exception was thrown, but I doubt I was going to catch all of that.
Thanks, I'll start re-thinking how we're handling state.
@jason-bragg Thanks for the great explanation. It resolves a lot of my confusion about the failure implication of WriteStateAsync(). It would be great to see related discussion in the Grain Persistence section.
In my project, I call DeactivateOnIdle() every time if a WriteStateAsync() fails within in a grain. However, I find out that a DeactivateOnIdle() call seems like remove the stream subscription if a grain explicitly subscribed to a stream. After I call DeacativeOnIdle() in this grain, the producer gets an exception as follows when trying to push data into the stream further.
Extension not installed on grain XXXX attempting to invoke type Orleans.Streams.OrleansCodeGenStreamConsumerExtensionMethodInvoker from invokable Orleans.Runtime.ActivationData
Exception = Orleans.Runtime.GrainExtensionNotInstalledException: Extension not installed on grain XXXX attempting to invoke type Orleans.Streams.OrleansCodeGenStreamConsumerExtensionMethodInvoker from invokable Orleans.Runtime.ActivationData
Is it the expected behavior? This seems a bit contradictory to the words in the document:
If the consumer of the stream dies (or its grain is deactivated) and new event is generated on the stream, the consumer grain will be automatically re-activated (just like any regular Orleans grain is automatically activated upon message to it).
I find out that a DeactivateOnIdle() call seems like remove the stream subscription if a grain explicitly subscribed to a stream.
This should not be the case. Explicit subscriptions are persistent. If a grain deactivates, but does not unsubscribe from a stream, the next event on that stream will reactivate the grain, and if the grain does not resume processing that stream during activation, the producer will get the exception you are seeing.
If one wishes to no longer process events on a stream upon deactivation, one needs to call StreamSubscriptionHandle
If one wishes to continue processing events on a stream, but is deactivating due to idleness or error, on activation one will need to query for existing subscriptions (IAsyncStream
var streamProvider = GetStreamProvider(streamProviderName);
var stream = streamProvider.GetStream<MyEvent>(streamGuid, streamNamespace);
foreach (StreamSubscriptionHandle<MyEvent> handle in await stream.GetAllSubscriptionHandles())
{
await handle.ResumeAsync(OnNextAsync, OnErrorAsync, recoveryToken);
}
@luomai I had that same problem the first time I set up streaming. It's easy to overlook the key bit:
will reactivate the grain, and _if the grain does not resume processing that stream during activation_
You have to do work inside your OnActivate to signal that you're available to process the stream again.
@moswald @jason-bragg Thanks for the quick replies. Does it mean that there has to be another grain, other than the stream producer, to specifically trigger (e.g., ping) the OnActivateAsync() method of a deactivated stream consumer. Is this correct?
EDIT:
I just read the document again and find the following comment points to the exact case in my project. It resolves my confusion now. Thanks a lot!
COMMENT:聽If the consumer grain implements the the聽IAsyncObserver聽interface directly (public class MyGrain
: Grain, IAsyncObserver ), it should in theory not be required to re-attach the聽IAsyncObserverand thus will not need to call聽ResumeAsync. The streaming runtime should be able to automatically figure out that the grain already implements聽IAsyncObserver聽and will just invoke those聽IAsyncObserver聽methods. However, the streaming runtime currently does not support this and the grain code still needs to explicitly call聽ResumeAsync, even if the grain implements聽IAsyncObserver聽directly. Supporting this is on our TODO list.