Azure-functions-durable-extension: Throttling concurrent activities programmatically

Created on 1 Feb 2019  路  17Comments  路  Source: Azure/azure-functions-durable-extension

I'm looking for a suitable pattern for throttling the number of activities that are processed concurrently. I have an orchestrator function that needs to execute a list of activities and I don't want them all to start simultaneously but to be able to control the concurrency level through a parameter received by the function. I'm aware of the maxConcurrentActivityFunctions setting, but it operates at the host level and, as far as I know, cannot be changed programmatically.

My initial attempt was to try something similar to the code below, but it does not appear to work within an orchestrator.

```C#
using (var throttle = new SemaphoreSlim(input.DegreeOfParallelism))
{
var results = input.Operations.Select(async operation =>
{
try
{
await throttle.WaitAsync();
return context.CallActivityAsync("ExecuteOperation", operation);
}
finally
{
throttle.Release();
}
});

    await Task.WhenAll(results);

}
```

Any suggestions?

Most helpful comment

I realized the code above did not preserve the results from each activity. I cleaned it up a bit and created an extension method that you can use it to call activities that return a TResult from an input list of objects of type T.

``` C#
public async static Task> ThrottleActivities(
this DurableOrchestrationContext context,
IEnumerable activities,
string functionName,
int degreeOfParallelism)
{
var runningActivities = new List>();
foreach (var activity in activities)
{
var pendingOperations = runningActivities.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}

    var result = context.CallActivityAsync<TResult>(functionName, activity);
    runningActivities.Add(result);
}

var results = await Task.WhenAll(runningActivities);

return results;

}

For example, to invoke activities that return strings for an input list of objects of type Operation and ensure that no more than 5 are started concurrently.
``` C#
[FunctionName("ExecuteBatchJob")]
public static async Task ExecuteBatchJobAsync(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    ILogger logger)
{
    var input = context.GetInput<BatchJob>();

    var results = await context.ThrottleActivities<string, Operation>(
        input.Operations,
        "ExecuteOperation",
        5);

    logger.LogDebug($"Completed all operations!");
}

All 17 comments

You should only be doing "await" on Durable Function dependencies, nothing else, while executing code in the orchestrator. Ref: https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-checkpointing-and-replay#orchestrator-code-constraints

One easy solution I would suggest is to create batches of activities and await them instead to get started. You won't get the full parallelism all the time but that's an easy start to get familiar with Durable Functions:
1) Take input.DegreeOfParallelism items inside input.Operations
2) Start a CallActivityAsync for each Operation taken in step 1)
3) Await Task.WhenAll all those activities
4) Go back to 1) take the next input.DegreeOfParallelism items.

You definitely do not want to do SemaphoreSlim.WaitAsync() in an orchestrator function since that breaks the rule about doing non-durable async work.

If you want to go down your current route, I would suggest using a combination of loops, a count integer to help count the number of in-flight activity functions, and Task.WhenAny so that you can block your loop from scheduling new activity functions until a previously scheduled activity completes. If you do this, definitely share your solution as I'd love to see it and be able to share with others. :)

Another option is to use a sub-orchestrator and have fixed-size parallel batches execute in each of your sub-orchestrations.

Thanks Simon and Chris. That helps. I have this so far and it appears to do the job, at least after a very quick test.

``` C#
var pendingOperations = new List>();
foreach (var operation in input.Operations)
{
if (pendingOperations.Count >= input.DegreeOfParallelism)
{
await Task.WhenAny(pendingOperations.ToArray());
pendingOperations = pendingOperations.Where(p => !p.IsCompleted).ToList();
}

    var result = context.CallActivityAsync<string>("ExecuteOperation", operation);
    pendingOperations.Add(result);
}

await Task.WhenAll(pendingOperations.ToArray());

```

I realized the code above did not preserve the results from each activity. I cleaned it up a bit and created an extension method that you can use it to call activities that return a TResult from an input list of objects of type T.

``` C#
public async static Task> ThrottleActivities(
this DurableOrchestrationContext context,
IEnumerable activities,
string functionName,
int degreeOfParallelism)
{
var runningActivities = new List>();
foreach (var activity in activities)
{
var pendingOperations = runningActivities.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}

    var result = context.CallActivityAsync<TResult>(functionName, activity);
    runningActivities.Add(result);
}

var results = await Task.WhenAll(runningActivities);

return results;

}

For example, to invoke activities that return strings for an input list of objects of type Operation and ensure that no more than 5 are started concurrently.
``` C#
[FunctionName("ExecuteBatchJob")]
public static async Task ExecuteBatchJobAsync(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    ILogger logger)
{
    var input = context.GetInput<BatchJob>();

    var results = await context.ThrottleActivities<string, Operation>(
        input.Operations,
        "ExecuteOperation",
        5);

    logger.LogDebug($"Completed all operations!");
}

Awesome. Thanks for sharing this. I'll go ahead and close this for now, but let us know if you have any further questions related to this.

I tried to implement the same pattern using typescript - it didn't work. Each time the orchestrator function re-enters, the state is lost, and runningActivities becomes empty.
I would expect the function to run forever, however, it stops and it's marked as completed after only 1 activity.

image

What's the right way of batching operation in Typescript? My operation write to CosmosDB and if I don't throttle them they completely overrun the CosmosDB alloted capacity (which is at max)

const runningActivities: Task[] = [];
let results;
for (const operation of operations) {
    const pendingOperations = runningActivities.filter(op => !op.isCompleted);
    if (pendingOperations.length >= degreeOfParallelism) {
        yield context.df.Task.any(pendingOperations);
    }
    const result = context.df.callActivity('a_ingressContainer', {
        operation: operation,
        ingressParams: input,
    });
    runningActivities.push(result);
}
results = context.df.Task.all(runningActivities);
return results;

Sorry, I'm not familiar with Azure Functions implemented with Typescript. However, one of the constraints documented here is:

JavaScript orchestrator functions cannot be async. They must be declared as synchronous generator functions.

Could that be relevant?

Also, and as I said before, I'm not a Typescript user, but shouldn't the line:

yield context.df.Task.any(pendingOperations);

be

await context.df.Task.any(pendingOperations);

@f2bo: Durable Functions in JavaScript uses generators under the covers to achieve its replay behavior and maintain synchronous operation; thus, async activities must be scheduled through context.df and use the generator syntax of yield, rather than await, so yield context.df.Task.any(pendingOperations); is correct.

However, @bernitorres, I think you're missing a yield on the second-to-last line of your code:

results = context.df.Task.all(runningActivities)

should be

result = yield context.df.Task.all(runningActivities);

Thanks @kashimiz, I added the missing yield on the line you pointed out, but the orchestration still finishes after a number of activities equal to degreeOfParallelism have been completed. In the following example this means that the orchestration is marked as completed after completing only 3 activities, instead of containers.length (which is 31). When debugging, I can see that the code after the for loop is never reached.

I updated the durable functions package to 1.8.0, hoping that could maybe fix it, but no dice.

What could be the reason for the orchestration ending prematurely?

This is the exact code I am running:

    const degreeOfParallelism = 3;
    const runningActivities: Task[] = [];
    let results;
    for (const operation of containers) {
        const pendingOperations = runningActivities.filter(op => !op.isCompleted);
        if (pendingOperations.length >= degreeOfParallelism) {
            yield context.df.Task.any(pendingOperations);
        }
        const activity = context.df.callActivity('a_ingressContainer', {
            container: operation,
            ingressParams: input,
        });
        runningActivities.push(activity);
    }
    results = yield context.df.Task.all(runningActivities);

    context.df.setCustomStatus(`Done. Processed ${containers.length} containers.`);
    return results;

Ping, has anybody figured out how to throttle programmatically using Typescript/Javascript?

Alternatively, is there an issue tracking why this code doesn't work?

Hi @f2bo @cgillum !
This code with Task.WhenAll and CallActivityAsync is not working I got an InvalidException, however it's very similar to your code. Any idea ?

            var productList = await this.productService.ImportFilesAsync();
            var listTaks = new List<Task>();

            foreach (var item in productList)
            {
                var processItemTask = context.CallActivityAsync(SYNC_PRODUCTITEM, item);
                listTaks.Add(processItemTask);
            }

            await Task.WhenAll(listTaks);

I precise that this code is executed into a sub-orchestrator function.

What's an InvalidException? Do you have a stack trace?

I don't see anything that stands out except that there's no throttling in your code. All the activities are launched concurrently. Also, I don't know what ImportFilesAsync does but presumably that's not where the exception occurs and it returns a valid input for your activities, correct?

I change this code to run into the main orchestrator function, and it worked.
Sorry I don't have the Stack Trace anymore.
ImportFiles just import a CSV file with I/O operation, and yeah the error occured in the line CallActivityAsync(SYNC_PRODUCTITEM, item);
But now my code is working when I placed this parallel process into the main orchestrator.

@cgillum I'm fairly new to durable functions so sorry if this is a noob question, but I'm trying to wrap my head around why Task.WhenAll/Task.WhenAny are ok to use in an orchestrator function but SemaphoreSlim.WaitAsync isn't. On the surface it seems neither are themselves "durable" or even deterministic, yet both are mostly just compositions of other tasks that _are_ durable. So intuitively it seems like the rules would be the same.

I suppose the more I think about it, a SemaphoreSlim has some state it needs to track where the others I'm guessing don't? Is that the important difference here?

As a side note, it would be helpful if it were documented which non-"durable" awaitables (Task.WhenAll, Task.WhenAny, others?) are allowed. I imagine it's a very short list, but I didn't see any mention of it here.

Hey @tmenier, totally understandable that this isn't super clear. It might help to think of Task.WhenAll and Task.WhenAny as APIs that help your orchestrate existing tasks. In this case, they are helping your orchestrate existing "durable" tasks. They don't do any I/O or spin up any new threads themselves, which is why they are safe to use in orchestrations.

The reason SemaphoreSlim is not allowed is because it will resume from the await on a worker pool thread instead of using the Durable orchestration execution thread. This causes the orchestration executor to get confused. That said, since orchestrations are single-threaded, I don't think there should be a scenario that requires a semaphore.

Point taken about the documentation. I'm not personally aware of others besides Task.WhenAll and Task.WhenAny though.

@cgillum Thanks for the explanation, that all makes perfect sense. I know this issue is long since closed but FWIW I think a function-level throttling feature like this would be a fantastic addition to the SDK. Thanks @f2bo for the code sample, this should work well for my scenario!

Was this page helpful?
0 / 5 - 0 ratings