Orleans: Another long-running tasks question

Created on 11 Mar 2020  Â·  2Comments  Â·  Source: dotnet/orleans

Hello!
I have some difficulties with code organization...
The task is simple — I need to call external service and process the result.
But there is a problem — this external call may take up to 3 minutes and I don't want to block the PaymentGrain grain during the call. How should I organize code?

public class PaymentGrain : Grain, IPaymentGrain
{
    public async Task Pay()
    {
        var payRequest = new HttpRequestMessage();
        var payResponse = await _httpClient.SendAsync(request); // it may take up to 3 minutes
        // Process result
    }
    ...

image

I see the option to call external service outside of the grain.

public class Worker 
{
    public async Task Pay() {
        var payRequest = new HttpRequestMessage();
        var payResponse = await _httpClient.SendAsync(request); // it may take up to 3 minutes
        _grain.ProcessPayResult(payResponse);
    }
    ...
public class PaymentGrain : Grain, IPaymentGrain
{
    public async Task ProcessPayResult(PayResponse payResponse)
    {
        // Process result
    }
    ...

image

Another option — to create separated grain for each long-running task or something like this. But it looks weird.

What is the idiomatic way to do that?

P.S.: I know that I must not execute long-running tasks on the orleans task scheduler. Let's omit this part for now.

Most helpful comment

It's fine to spin up long running Tasks inside grains - just don't let them block the grain method they started from. Here is a basic/naive pseudo-code example, using your business context:

public class PaymentGrain: Grain, IPaymentGrain
{
    private Task _myLongRunningTask;
    private CancellationTokenSource _cancellation = new CancellationTokenSource();

    public Task StartPayAsync()
    {
        // option 1: let it run in the current grain activation scheduler
        _myLongRunningTask = Task.Factory.StartNew(_ => DoLongRunningWorkAsync(), null, _cancellation.Token, TaskCreationOptions.DenyChildAttach, TaskScheduler.Current).Unwrap();

        // option 2: make it run on the thread pool scheduler
        _myLongRunningTask = Task.Run(() => DoLongRunningWorkAsync(), _cancellation.Token);

        return Task.CompletedTask;
    }

    public Task<TaskStatus?> GetStatusAsync()
    {
        return Task.FromResult(_myLongRunningTask?.Status);
    }

    public Task StopAsync()
    {
        _cancellation.Cancel();
        return Task.CompletedTask;
    }

    private async Task DoLongRunningWorkAsync()
    {
        // do long running stuff here as normal, check cancellation token as needed
    }
}

The above is from the top of my head but it shows the gist of it. Spin the task up, keep it somewhere you can access it, and provide a way to check on its status and maybe cancel it.

The two options there refer to whether to spin that task on the activation scheduler or on the standard thread pool scheduler. There are trade-offs to either choice:

  • Activation Scheduler: Lets you run the task in the context of the grain and therefore allow direct access to grain state while maintaining turn-based concurrency guarantees. This is fine for I/O bound work. However CPU bound work will hog the activation scheduler and keep other grain methods from taking a turn at it. Orleans will throw off warnings if turns take longer than 200ms (configurable), so you'll hopefully know if that's the case.

  • Thread Pool Scheduler: Lets you run the task on its own, outside the grain. This will forego the grain's concurrency guarantees and will make it unsafe to access grain state, as the code may run in parallel with other grain activity. In return, CPU bound work won't stop other grain methods from executing.

Once you made your choice, there's plenty of options to model the long-running work, here is a far from exhaustive list:

  • Polling model (such as above). Spin it up and provide a grain method to check the task status. The caller can poll via a timer or some retry policy from a library like Polly if they wish.
  • Callback model: Let the StartPayAsync method receive a grain interface to callback upon. On task completion, callback said interface on some agreed-upon method.
  • Stream model: Receive the task from a stream and spin it up. On completion, push it to the next stream. Allow enough time in the underlying stream tech visibility settings, so the task doesn't regenerate early if it takes long to complete. The Orleans virtual stream abstraction won't necessarily work with this, as it is bound to the same grain method execution limits (unless you raise them).
  • Ready Cache model: Spin up the task and return a generated ID. On completion put the results of the task onto a local dictionary with that ID. Provide a method to poll and fetch the result of the task.
  • Reactive Caching model: Take any polling approach and add the reactive caching pattern from the samples.
  • Saga model: For long-running transactional work between multiple components. You can model grain messages as saga(ish) todo-list envelopes.
  • TPL Dataflow: For complex graph orchestration, the world is your oyster.

On all the choices above, you'll also have to design recovery for when the long running task faults or the host itself collapses, taking any running tasks with it. On a transactional context such as payments, there's the extra care of ensuring processes succeed or fail as a group, so carefully consider which option is best for the use case and requirements and the environment they must run on.

All 2 comments

It's fine to spin up long running Tasks inside grains - just don't let them block the grain method they started from. Here is a basic/naive pseudo-code example, using your business context:

public class PaymentGrain: Grain, IPaymentGrain
{
    private Task _myLongRunningTask;
    private CancellationTokenSource _cancellation = new CancellationTokenSource();

    public Task StartPayAsync()
    {
        // option 1: let it run in the current grain activation scheduler
        _myLongRunningTask = Task.Factory.StartNew(_ => DoLongRunningWorkAsync(), null, _cancellation.Token, TaskCreationOptions.DenyChildAttach, TaskScheduler.Current).Unwrap();

        // option 2: make it run on the thread pool scheduler
        _myLongRunningTask = Task.Run(() => DoLongRunningWorkAsync(), _cancellation.Token);

        return Task.CompletedTask;
    }

    public Task<TaskStatus?> GetStatusAsync()
    {
        return Task.FromResult(_myLongRunningTask?.Status);
    }

    public Task StopAsync()
    {
        _cancellation.Cancel();
        return Task.CompletedTask;
    }

    private async Task DoLongRunningWorkAsync()
    {
        // do long running stuff here as normal, check cancellation token as needed
    }
}

The above is from the top of my head but it shows the gist of it. Spin the task up, keep it somewhere you can access it, and provide a way to check on its status and maybe cancel it.

The two options there refer to whether to spin that task on the activation scheduler or on the standard thread pool scheduler. There are trade-offs to either choice:

  • Activation Scheduler: Lets you run the task in the context of the grain and therefore allow direct access to grain state while maintaining turn-based concurrency guarantees. This is fine for I/O bound work. However CPU bound work will hog the activation scheduler and keep other grain methods from taking a turn at it. Orleans will throw off warnings if turns take longer than 200ms (configurable), so you'll hopefully know if that's the case.

  • Thread Pool Scheduler: Lets you run the task on its own, outside the grain. This will forego the grain's concurrency guarantees and will make it unsafe to access grain state, as the code may run in parallel with other grain activity. In return, CPU bound work won't stop other grain methods from executing.

Once you made your choice, there's plenty of options to model the long-running work, here is a far from exhaustive list:

  • Polling model (such as above). Spin it up and provide a grain method to check the task status. The caller can poll via a timer or some retry policy from a library like Polly if they wish.
  • Callback model: Let the StartPayAsync method receive a grain interface to callback upon. On task completion, callback said interface on some agreed-upon method.
  • Stream model: Receive the task from a stream and spin it up. On completion, push it to the next stream. Allow enough time in the underlying stream tech visibility settings, so the task doesn't regenerate early if it takes long to complete. The Orleans virtual stream abstraction won't necessarily work with this, as it is bound to the same grain method execution limits (unless you raise them).
  • Ready Cache model: Spin up the task and return a generated ID. On completion put the results of the task onto a local dictionary with that ID. Provide a method to poll and fetch the result of the task.
  • Reactive Caching model: Take any polling approach and add the reactive caching pattern from the samples.
  • Saga model: For long-running transactional work between multiple components. You can model grain messages as saga(ish) todo-list envelopes.
  • TPL Dataflow: For complex graph orchestration, the world is your oyster.

On all the choices above, you'll also have to design recovery for when the long running task faults or the host itself collapses, taking any running tasks with it. On a transactional context such as payments, there's the extra care of ensuring processes succeed or fail as a group, so carefully consider which option is best for the use case and requirements and the environment they must run on.

@JorgeCandeias
Thank you for a very detailed answer! :)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

turowicz picture turowicz  Â·  3Comments

danvanderboom picture danvanderboom  Â·  3Comments

jdom picture jdom  Â·  3Comments

gabikliot picture gabikliot  Â·  4Comments

bwanner picture bwanner  Â·  5Comments