I've seen in the documentation about how "Orleans is not suited for long running work" (Generally) - I may be paraphrasing. There are a few issues regarding that same topic #4531, #4370, #3071, and several others that I'm having trouble finding ATM.
The tldr of what I'm hoping to accomplish with Orleans, is to take lots of calls from my app, and dispatch them across a cluster of Orleans silos. Many of these calls that occur are completed in a few milliseconds, but many take several minutes to complete when running in our non-Orleans implementation.
I understand there is a default timeout of 30 seconds for the client to receive a response from the grain, and I would feel weird about increasing that to accommodate our "longest running logic", I feel like there has to be a better way of doing it that just increasing the timeout.
Currently my calls (either fast or slow) look similar to this, and note this is all CPU bound work:
Client.cs
public async Task<MyType> DoWork(Parameter param)
{
var grain = _clusterClient.GetGrain<IMyGrain>(Guid.NewGuid());
return await grain.DoWork(param);
}
Grain.cs
public async Task<MyType> DoWork(Parameter param)
{
return await Task.Run(() =>
{
// CPU bound sync code
});
}
The above seems to work out great when DoWork is fast running, at least at small scale. Prior to using Orleans I was using a LimitedConcurrencyTaskScheduler in order to keep memory usage down; it seemed like I was getting into some situations where I'd have so many scheduled tasks, that all memory would be utilized and nothing would complete.
The above code does not work without setting the timeout to at least as high as our "longest running operation". It seems that a lot of suggestions on getting around this are to "return immediately that the task has been started, then get the result when it's finished", but I'm not finding any samples on how to accomplish this, at least from both a client and grain perspective.
Could some samples be provided on how to accomplish what I'm going after? It seems that some sort of check that an invocation is complete would be needed, but I'm not sure how to go about that. Would this require my grains to become stateful, rather than stateless? #4370 seems like it may be of use, but I'm only seeing from the grain perspective, not what it looks like from the client. Ideally my fast and long running client/grains with have similar looking implementations, but if I have to break out into separate methods for accommodating, that's fine as well.
Hopefully this all makes sense!
I'm not sure if this is at all scaleable, but here's something I got working, I don't know if this is a standard way of doing it, or if there is some guidance on making it more maintainable and easier to implement but...
IGrain.cs
public interface IGrain : IGrainWithGuidKey
{
Task<MyType> DoWorkAsync(Parameters param);
}
GrainState.cs
public enum GrainState
{
Initialized,
Working,
CompletedWork,
Faulted,
ShouldDispose
}
MyType.cs
public class MyType { }
Parameters.cs
public class Parameters { }
Client.cs
public async Task<MyType> DoWorkAsync(Parameters param)
{
var grain = _clusterClient.GetGrain<IGrain>(Guid.NewGuid());
// Perform polling of grain until a result is present
while (true)
{
var result = await grain.DoWorkAsync(param);
if (result != null)
{
return result;
}
await Task.Delay(Constants.TaskPollingSeconds);
}
}
Grain.cs
[StorageProvider(ProviderName = Constants.StorageProviderName)]
public class Grain : Grain<GrainState>, IGrain
{
private MyType _result;
public async Task<MyType> DoWorkAsync(Parameters param)
{
switch (State)
{
case GrainState.Faulted:
throw new NotSupportedException(
$"{this} is in state {State} and not available for further invocations."
);
case GrainState.Initialized:
State = GrainState.Working;
await WriteStateAsync();
Task.Run(() =>
{
PerformWorkAsync(param).FireAndForget();
}).FireAndForget();
return (null);
case GrainState.Working:
return (null);
case GrainState.CompletedWork:
State = GrainState.ShouldDispose;
await WriteStateAsync();
return (_result);
default:
throw new ArgumentException($"Unexpected {nameof(State)}");
}
}
private async Task PerformWorkAsync(Parameters param)
{
MyType myType = null;
// Do CPU bound work
_result = myType;
State = GrainState.CompletedWork;
await WriteStateAsync();
}
}
In the above, it seems to be working at a small scale at least, and there are obviously some things I could factor out into an abstract class as to avoid some boilerplate. Was curious how this approach fairs with intentions of Orleans, and/or what I can do to improve it?
What are my risks here in using Task.Run? Is there a safer way to do this? I don't want to block against Orleans calls coming through, but I also don't want these grains to blow up memory.
Thanks!
What you are trying to do looks similar to what was discussed in #3071. However, I don't understand some of the parts in your code.
Why do you do WriteStateAsync() after changing state of the grain? I think it should be volatile (in-memory only) variable because the executing operation seems to make sense only in the context of that grain activation and silo.
I would split DoWorkAsync into two methods: BeginWork and GetStatus. BeginWork would the Start.Run business, flip the State variable to Working, and return success, meaning that the operation started. GetStatus would simply return the current status, and could be used for polling.
What are my risks here in using Task.Run? Is there a safer way to do this? I don't want to block against Orleans calls coming through, but I also don't want these grains to blow up memory.
Did you look at http://dotnet.github.io/orleans/Documentation/Advanced-Concepts/External-Tasks-and-Grains.html for how Task.Run interoperates with grain threads?
Why do you do
WriteStateAsync()after changing state of the grain? I think it should be volatile (in-memory only) variable because the executing operation seems to make sense only in the context of that grain activation and silo.
I thought I was seeing that in some of the samples of tickets I had included, I did not have a specific reason for it save that.
I would split
DoWorkAsyncinto two methods:BeginWorkandGetStatus.BeginWorkwould theStart.Runbusiness, flip theStatevariable toWorking, and return success, meaning that the operation started.GetStatuswould simply return the current status, and could be used for polling.
Interesting... I guess that would work, where does the result get returned in this case? a third function on the grain interface? I've seen some talk of pub/sub, but I don't have much of any experience with that, so am unsure on how to approach.
Did you look at http://dotnet.github.io/orleans/Documentation/Advanced-Concepts/External-Tasks-and-Grains.html for how
Task.Runinteroperates with grain threads?
I have seen the page, but I don't think I understand the real implications of it, working with Tasks and async/await in general is pretty new territory for me, so I am afraid that I have missed something.
I guess that would work, where does the result get returned in this case? a third function on the grain interface?
Yeah, that's where it gets tricky. Publishing the result via a stream would be the cleanest (conceptually speaking) approach. But it requires the streaming machinery to be configured, which could be an overkill for your case.
I simple 'hack' would be to return the result as part of the response of GetStatus. A separate method, as you mentioned, is another form of that. The challenge here I think is to correctly handle results from multiple calls and to avoid races between parallel client calls.
I have seen the page, but I don't think I understand the real implications of it, working with Tasks and async/await in general is pretty new territory for me, so I am afraid that I have missed something.
The key here is to execute a long running computation on a non-grain thread, which is achieved via Task.Run scheduling it on the .NET thread pool. However, code that executes on non-grain threads cannot make calls to grains and cannot mutate grain state because that would violate the single-threaded guarantee of grains. This example shows how to marshal control back to a grain thread to be able to make grain calls.
I simple 'hack' would be to return the result as part of the response of GetStatus. A separate method, as you mentioned, is another form of that. The challenge here I think is to correctly handle results from multiple calls and to avoid races between parallel client calls.
with var grain = _clusterClient.GetGrain<IMyGrain>(Guid.NewGuid()); wouldn't that ensure you wouldn't be mixing contexts and within the method should always be working with your intended grain? If multiple execution contexts all hit this method, they're all looping with awaits until grain completion, there shouldn't really be any potential mixing of results across methods, since each grain should have it's own primary key, no? Is this problematic to grab potentially multiple instances of your grain? I had assumed not since they already have the concept of primary keys, but perhaps my equating that to DB is mistaken?
The key here is to execute a long running computation on a non-grain thread, which is achieved via
Task.Runscheduling it on the .NET thread pool. However, code that executes on non-grain threads cannot make calls to grains and cannot mutate grain state because that would violate the single-threaded guarantee of grains. This example shows how to marshal control back to a grain thread to be able to make grain calls.
I will really have to read this pretty closely then I suppose eh? :)
with var grain = _clusterClient.GetGrain
(Guid.NewGuid()); wouldn't that ensure you wouldn't be mixing contexts and within the method should always be working with your intended grain?
If a unique grain is used for each request/context, then you are safe.
I will really have to read this pretty closely then I suppose eh? :)
Don't we always have to read closely when dealing with threading? ;-)
Just to put it all mostly together (i still have some looking into how to handle the appropriate fire and forgot outside of the orleans context), what do you think of this?
MyType.cs
public class MyType { }
MyTypeDifferent.cs
public class MyTypeDifferent { }
Parameters.cs
public class Parameters { }
ParametersDifferent.cs
public class ParametersDifferent { }
GrainState.cs
public enum GrainState
{
Initialized,
Working,
CompletedWork,
Faulted,
ShouldDispose
}
IPollableGrain.cs
public interface IPollableGrain<TResult>
{
Task<GrainState> CheckStatusAsync();
Task<TResult> GetResultAsync();
}
IGrain.cs
public interface IGrain<MyType> : IPollableGrain<MyType>, IGrainWithGuidKey
{
Task<MyType> BeginWorkAsync(Parameters param);
}
IGrainDifferent.cs
public interface IGrainDifferent<MyTypeDifferent> : IPollableGrain<MyTypeDifferent>, IGrainWithGuidKey
{
Task<MyTypeDifferent> BeginDifferentWorkAsync(ParametersDifferent param);
}
Client.cs
public class Client
{
protected async Task<TResult> PollWorkUntilCompleteAsync<TResult>(
IPollableGrain<TResult> pollableGrain
)
{
while (true)
{
var state = await pollableGrain.CheckStatusAsync();
if (state == GrainState.CompletedWork)
{
return await pollableGrain.GetResultAsync();
}
await Task.Delay(Constants.TaskPollingSeconds);
}
}
public async Task<MyType> DoWorkAsync(Parameters param)
{
var grain = _clusterClient.GetGrain<IGrain>(Guid.NewGuid());
await grain.BeginWorkAsync(param);
return await PollWorkUntilCompleteAsync(grain);
}
public async Task<MyTypeDifferent> DoDifferentWorkAsync(ParametersDifferent param)
{
var grain = _clusterClient.GetGrain<IGrainDifferent>(Guid.NewGuid());
await grain.BeginDifferentWorkAsync(param);
return await PollWorkUntilCompleteAsync(grain);
}
}
GrainBase.cs
public abstract class GrainBase<TResult> : Grain<GrainState>, IPollableGrain<TResult>
{
protected TResult Result;
/// <summary>
/// Kicks off <see cref="DoWorkAsync"/>, this method should be invoked
/// via actual grain implementation after saving necessary parameters as members of the instance.
/// This is done to keep a consistent task execution abstraction, one place to update if it's ever
/// changed.
/// </summary>
/// <remarks>Calls <see cref="DoWorkAsync"/></remarks>
/// <returns></returns>
protected Task<bool> BeginGrainWorkAsync()
{
if (State == GrainState.Initialized)
{
State = GrainState.Working;
Task.Run(() =>
{
DoWorkAsync().FireAndForget();
}).FireAndForget();
return Task.FromResult(true);
}
return Task.FromResult(false);
}
/// <summary>
/// The CPU bound work to do related to the grain.
/// Many grain methods will have differing parameters required,
/// those parameters should be saved to the instance for utilization within <see cref="DoWorkAsync"/>.
///
/// <see cref="Result"/> should be set with the outcome of this method,
/// and State set to <see cref="GrainState.CompletedWork"/>
/// </summary>
/// <remarks>Invoked via <see cref="BeginGrainWorkAsync"/></remarks>
/// <returns></returns>
protected abstract Task DoWorkAsync();
public Task<GrainState> CheckStatusAsync()
{
// just in case?
if (Result != null)
{
State = GrainState.CompletedWork;
}
return Task.FromResult(State);
}
public Task<TResult> GetResultAsync()
{
if (State != GrainState.CompletedWork)
{
throw new NotSupportedException(
$"Invalid State for returning result, must be in state {nameof(GrainState.CompletedWork)} to return result");
}
State = GrainState.ShouldDispose;
return Task.FromResult(Result);
}
}
Grain.cs
public class Grain : GrainBase<MyType>, IGrain<MyType>
{
private Parameters _param;
public async Task<bool> BeginWorkAsync(Parameters param)
{
_param = param;
return await BeginGrainWorkAsync();
}
protected override Task DoWorkAsync()
{
Result = null // CPU bound work;
State = GrainState.CompletedWork;
return Task.CompletedTask;
}
}
DifferentGrain.cs
public class DifferentGrain : GrainBase<MyTypeDifferent>, IGrainDifferent<MyTypeDifferent>
{
private ParametersDifferent _param;
public async Task<bool> BeginDifferentWorkAsync(ParametersDifferent param)
{
_param = param;
return await BeginGrainWorkAsync();
}
protected override Task DoWorkAsync()
{
Result = null // CPU bound work;
State = GrainState.CompletedWork;
return Task.CompletedTask;
}
}
I need some error handling in there, better consideration on how i'm actually starting my long running CPU work near the FireAndForget. I'm not sure how I feel exactly about my base class, but I do like how it leaves very little to implement on a per grain basis, it just sets up a very specific workflow of implementation calling into base which calls into the implementation of the abstract method, etc.
Hopefully this seems to make sense, and if it does, it's out there for the next guy! :)
And if not, appreciate any help tightening things up.
The problem I see with this implementation is that DoWorkAsync mutates state of the grain while executing on a parallel (.NET thread pool) thread. To avoid potential concurrent execution of that code in a race with another method of the grain, it needs to wrap that logic in a lambda scheduled on the grain's scheduling context as shown in http://dotnet.github.io/orleans/Documentation/Advanced-Concepts/External-Tasks-and-Grains.html. Something like this:
Task t2 = Task.Factory.StartNew(() =>
{
// This code runs on the Orleans task scheduler since we specified the scheduler: orleansTs.
Result = null // CPU bound work;
State = GrainState.CompletedWork;
return Task.CompletedTask;
}, CancellationToken.None, TaskCreationOptions.None, scheduler: orleansTs);
@sergeybykov Wouldn't doing it on the orleansTs in effect block further orleans calls until the long running task is complete? This is what I wanted to avoid, I wanted to mostly throw it out on a separate scheduler so that orleans can continue to process messages and queueing up additional tasks (or returning them immediately when they're not long running).
I was hoping because I'm always going to be using new grain instances for each call, and it's always going to be in the flow BeginAsyncWork, PollUntilComplete, then GetResult, I wouldn't have any issues with the separately scheduled task changing the grain state from Working to Complete. The polling mechanism polls until the Complete state is encountered, and the DoWorkAsync doesn't set a new state until after setting the Result of the grain instance.
If I'm way off please let me know, I'm having trouble wrapping my head around task schedulers in general since they're so new to me and I haven't felt the pain that is doing it wrong yet :D
Wouldn't doing it on the orleansTs in effect block further orleans calls until the long running task is complete?
It will not if you schedule that task on orleansTs after the long running task completes its work.
Thanks for all the help @sergeybykov ! 馃憤
Most helpful comment
Thanks for all the help @sergeybykov ! 馃憤